| """ |
| Extracts the statistical signature of human writing vs AI writing. |
| Uses Kaggle datasets to build: |
| |
| 1. HumanPatternProfile — a statistical distribution of human writing features |
| 2. AIPatternProfile — a statistical distribution of AI writing features |
| 3. HumanPatternClassifier — a lightweight FROZEN classifier used at training time |
| to score how "human-like" the model's output looks. |
| |
| The classifier is FROZEN during main model training. It is pre-trained separately |
| on the Kaggle datasets, then its output score is used as a reward/penalty signal |
| in the main training loss. |
| |
| Feature set extracted (17 dimensions): |
| - Perplexity under GPT-2 (AI text tends to be lower perplexity) |
| - Burstiness score (human writing has more sentence length variance) |
| - Sentence starter diversity |
| - n-gram novelty scores (bigram, trigram, 4-gram) |
| - AI marker density |
| - Overused discourse density |
| - Punctuation patterns (em-dash, ellipsis, comma, semicolon rates) |
| - Distributional features (word count, sentence count, mean/std sent length, TTR) |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.utils.data import Dataset, DataLoader |
| from transformers import GPT2LMHeadModel, GPT2TokenizerFast |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import StandardScaler |
| from typing import List, Tuple, Dict, Optional |
| import spacy |
| from collections import Counter |
| import math |
| from loguru import logger |
| from concurrent.futures import ProcessPoolExecutor |
| import multiprocessing as mp |
|
|
|
|
| |
| AI_OVERUSED_MARKERS = { |
| "furthermore", "moreover", "additionally", "consequently", |
| "in conclusion", "to summarize", "it is worth noting", |
| "it is important to note", "in today's world", "in today's society", |
| "in the modern era", "as previously mentioned", "needless to say", |
| "it goes without saying", "at the end of the day", |
| "in terms of", "with regard to", "with respect to", |
| "delve", "leverage", "utilize", "holistic", "paradigm", |
| "transformative", "groundbreaking", "revolutionary", "game-changing", |
| "multifaceted", "nuanced", "comprehensive", "robust", "seamless", |
| "innovative", "synergy", "cutting-edge", "state-of-the-art", |
| } |
|
|
| |
| AI_FINGERPRINT_WORDS = { |
| "delve", "underscore", "tapestry", "intricate", "pivotal", |
| "crucial", "vital", "essential", "significant", "notable", |
| "commendable", "noteworthy", "straightforward", "straightforwardly", |
| "elucidate", "expound", "illuminate", "unravel", "harness", |
| "foster", "facilitate", "leverage", "optimize", "streamline", |
| } |
|
|
|
|
| |
| def _compute_text_features(text: str) -> np.ndarray: |
| """Compute the 16 non-perplexity features from raw text. |
| Returns a 16-dim float32 array (features 2-17, perplexity slot excluded). |
| This function is designed to be called in a worker process. |
| """ |
| if not text or not text.strip(): |
| return np.zeros(16, dtype=np.float32) |
|
|
| words = text.split() |
| word_count = max(len(words), 1) |
|
|
| |
| import re |
| raw_sents = re.split(r'(?<=[.!?])\s+', text.strip()) |
| sentences = [s.strip() for s in raw_sents if s.strip()] |
| sent_lengths = [len(s.split()) for s in sentences] if sentences else [0] |
|
|
| features = [] |
|
|
| |
| if len(sentences) < 2: |
| features.append(0.0) |
| else: |
| lengths = [len(s.split()) for s in sentences] |
| mean_len = np.mean(lengths) |
| features.append(float(np.std(lengths) / mean_len) if mean_len > 0 else 0.0) |
|
|
| |
| if not sentences: |
| features.append(0.0) |
| else: |
| starters = [] |
| for s in sentences: |
| w = s.strip().split() |
| if w: |
| starters.append(w[0].lower()) |
| features.append(len(set(starters)) / len(starters) if starters else 0.0) |
|
|
| |
| words_lower = text.lower().split() |
| for n in (2, 3, 4): |
| if len(words_lower) < n: |
| features.append(1.0) |
| else: |
| ngrams = [tuple(words_lower[i:i + n]) for i in range(len(words_lower) - n + 1)] |
| features.append(len(set(ngrams)) / len(ngrams) if ngrams else 1.0) |
|
|
| |
| word_set = set(text.lower().split()) |
| ai_count = len(word_set & AI_FINGERPRINT_WORDS) |
| features.append((ai_count / word_count) * 100) |
|
|
| |
| text_lower = text.lower() |
| discourse_count = sum(1 for marker in AI_OVERUSED_MARKERS if marker in text_lower) |
| features.append((discourse_count / word_count) * 100) |
|
|
| |
| features.append((text.count("—") + text.count("–")) / word_count * 100) |
| features.append(text.count("...") / word_count * 100) |
| features.append(text.count(",") / word_count * 100) |
| features.append(text.count(";") / word_count * 100) |
|
|
| |
| features.append(np.log1p(word_count)) |
|
|
| |
| features.append(np.log1p(len(sentences))) |
|
|
| |
| features.append(np.mean(sent_lengths)) |
|
|
| |
| features.append(np.std(sent_lengths) if len(sent_lengths) > 1 else 0.0) |
|
|
| |
| unique_words = set(w.lower() for w in words) |
| features.append(len(unique_words) / word_count) |
|
|
| return np.array(features, dtype=np.float32) |
|
|
|
|
| class HumanPatternFeatureExtractor: |
| """Extracts 17-dimensional feature vector encoding human vs AI writing patterns. |
| |
| Optimised for bulk extraction: |
| - GPT-2 perplexity computed in batches on GPU (if available) |
| - Text features computed in parallel via multiprocessing |
| """ |
|
|
| def __init__(self, spacy_model: str = "en_core_web_sm", device: Optional[str] = None): |
| |
| if device is None: |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| else: |
| self.device = device |
|
|
| |
| logger.info("Loading GPT-2 for perplexity calculation...") |
| self.gpt2_model = GPT2LMHeadModel.from_pretrained("gpt2") |
| self.gpt2_tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") |
| self.gpt2_tokenizer.pad_token = self.gpt2_tokenizer.eos_token |
| self.gpt2_model.eval() |
|
|
| |
| self.gpt2_model = self.gpt2_model.to(self.device) |
|
|
| |
| if self.device == "cuda": |
| self.gpt2_model = self.gpt2_model.half() |
| logger.info(f"GPT-2 loaded on {self.device} with fp16") |
| else: |
| logger.info(f"GPT-2 loaded on {self.device}") |
|
|
| logger.info("HumanPatternFeatureExtractor initialised") |
|
|
| def _perplexity(self, text: str, max_len: int = 256) -> float: |
| """GPT-2 perplexity for a single text. Lower = more AI-like.""" |
| try: |
| encodings = self.gpt2_tokenizer( |
| text, return_tensors="pt", truncation=True, max_length=max_len |
| ) |
| input_ids = encodings["input_ids"].to(self.device) |
|
|
| if input_ids.size(1) < 2: |
| return 100.0 |
|
|
| with torch.no_grad(): |
| outputs = self.gpt2_model(input_ids, labels=input_ids) |
| loss = outputs.loss |
|
|
| return math.exp(min(loss.float().item(), 10)) |
| except Exception: |
| return 100.0 |
|
|
| def _perplexity_batch(self, texts: List[str], max_len: int = 256, batch_size: int = 8) -> List[float]: |
| """Compute GPT-2 perplexity for a batch of texts efficiently on GPU. |
| |
| Processes texts in mini-batches with padding for maximum throughput. |
| Default batch_size=8 sized for GPUs with ~4GB VRAM (e.g. RTX 3050). |
| """ |
| results = [] |
|
|
| for i in range(0, len(texts), batch_size): |
| batch_texts = texts[i:i + batch_size] |
|
|
| |
| encodings = self.gpt2_tokenizer( |
| batch_texts, |
| return_tensors="pt", |
| truncation=True, |
| max_length=max_len, |
| padding=True, |
| ) |
|
|
| input_ids = encodings["input_ids"].to(self.device) |
| attention_mask = encodings["attention_mask"].to(self.device) |
|
|
| with torch.no_grad(), torch.amp.autocast(device_type=self.device if self.device != "cpu" else "cpu"): |
| |
| outputs = self.gpt2_model( |
| input_ids, |
| attention_mask=attention_mask, |
| ) |
| logits = outputs.logits |
|
|
| |
| |
| shift_logits = logits[:, :-1, :].contiguous() |
| shift_labels = input_ids[:, 1:].contiguous() |
| shift_mask = attention_mask[:, 1:].contiguous() |
|
|
| |
| loss_fct = nn.CrossEntropyLoss(reduction="none") |
| |
| per_token_loss = loss_fct( |
| shift_logits.view(-1, shift_logits.size(-1)), |
| shift_labels.view(-1), |
| ).view(shift_labels.size()) |
|
|
| |
| masked_loss = per_token_loss * shift_mask.float() |
| token_counts = shift_mask.float().sum(dim=1).clamp(min=1) |
| per_sample_loss = masked_loss.sum(dim=1) / token_counts |
|
|
| |
| for loss_val in per_sample_loss: |
| ppl = math.exp(min(loss_val.float().item(), 10)) |
| results.append(ppl) |
|
|
| |
| del input_ids, attention_mask, outputs, logits, shift_logits, shift_labels |
| del shift_mask, per_token_loss, masked_loss, token_counts, per_sample_loss |
| if self.device == "cuda": |
| torch.cuda.empty_cache() |
|
|
| return results |
|
|
| def extract(self, text: str) -> np.ndarray: |
| """Extract full 17-dimensional feature vector for a single text.""" |
| if not text or not text.strip(): |
| return np.zeros(17, dtype=np.float32) |
|
|
| |
| ppl = self._perplexity(text) |
|
|
| |
| text_features = _compute_text_features(text) |
|
|
| |
| features = np.empty(17, dtype=np.float32) |
| features[0] = ppl |
| features[1:] = text_features |
|
|
| return features |
|
|
| def extract_batch( |
| self, |
| texts: List[str], |
| batch_size: Optional[int] = None, |
| num_workers: int = 0, |
| progress_every: int = 1000, |
| ) -> np.ndarray: |
| """Extract features for many texts efficiently. |
| |
| Strategy: |
| 1. Compute perplexity in batched GPU forward passes |
| 2. Compute text features in parallel via multiprocessing |
| 3. Merge into (N, 17) array |
| |
| Args: |
| texts: List of text strings |
| batch_size: Batch size for GPT-2 perplexity (default 8 for ~4GB VRAM GPUs) |
| num_workers: Number of processes for text features. 0 = auto-detect. |
| progress_every: Log progress every N texts |
| |
| Returns: |
| np.ndarray of shape (len(texts), 17) |
| """ |
| n = len(texts) |
| if batch_size is None: |
| |
| if self.device == "cuda": |
| vram_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) |
| batch_size = max(4, min(32, int(vram_gb))) |
| else: |
| batch_size = 4 |
| logger.info(f"Extracting features for {n} texts (device={self.device}, batch_size={batch_size})") |
|
|
| |
| logger.info(" Computing batched GPT-2 perplexity...") |
| all_ppl = [] |
| for start in range(0, n, batch_size): |
| end = min(start + batch_size, n) |
| batch = texts[start:end] |
| ppl_batch = self._perplexity_batch(batch, batch_size=len(batch)) |
| all_ppl.extend(ppl_batch) |
|
|
| if (start // batch_size) % max(1, (progress_every // batch_size)) == 0 and start > 0: |
| logger.info(f" Perplexity: {start}/{n}") |
|
|
| logger.info(f" Perplexity complete: {n}/{n}") |
|
|
| |
| logger.info(" Computing text features (parallel)...") |
| if num_workers == 0: |
| num_workers = min(mp.cpu_count(), 8) |
|
|
| |
| if n < 500 or num_workers <= 1: |
| text_features_list = [] |
| for i, text in enumerate(texts): |
| text_features_list.append(_compute_text_features(text)) |
| if i > 0 and i % progress_every == 0: |
| logger.info(f" Text features: {i}/{n}") |
| else: |
| |
| text_features_list = [] |
| with ProcessPoolExecutor(max_workers=num_workers) as executor: |
| |
| chunk_size = 2000 |
| for chunk_start in range(0, n, chunk_size): |
| chunk_end = min(chunk_start + chunk_size, n) |
| chunk = texts[chunk_start:chunk_end] |
| chunk_results = list(executor.map(_compute_text_features, chunk, chunksize=200)) |
| text_features_list.extend(chunk_results) |
| if chunk_start > 0: |
| logger.info(f" Text features: {chunk_start}/{n}") |
|
|
| logger.info(f" Text features complete: {n}/{n}") |
|
|
| |
| features = np.empty((n, 17), dtype=np.float32) |
| features[:, 0] = np.array(all_ppl, dtype=np.float32) |
| features[:, 1:] = np.array(text_features_list, dtype=np.float32) |
|
|
| return features |
|
|
|
|
| class KaggleHumanPatternDataset(Dataset): |
| """ |
| Loads both Kaggle datasets and produces (feature_vector, label) pairs. |
| label = 1 (human) | 0 (AI) |
| """ |
|
|
| def __init__( |
| self, |
| shanegerami_path: str, |
| starblasters_path: str, |
| extractor: HumanPatternFeatureExtractor, |
| max_samples_per_source: int = 50000, |
| ): |
| self.extractor = extractor |
| self.texts = [] |
| self.labels = [] |
|
|
| |
| logger.info(f"Loading Shanegerami dataset from {shanegerami_path}...") |
| try: |
| df_shane = pd.read_csv(shanegerami_path, nrows=max_samples_per_source * 2) |
| |
| text_col = None |
| label_col = None |
| for col in df_shane.columns: |
| col_lower = col.lower() |
| if col_lower in ("text", "essay_text", "content", "essay"): |
| text_col = col |
| elif col_lower in ("generated", "label", "is_ai", "ai_generated", "class"): |
| label_col = col |
|
|
| if text_col is None: |
| text_col = df_shane.columns[0] |
| logger.warning(f"Auto-detected text column: {text_col}") |
| if label_col is None: |
| label_col = df_shane.columns[-1] |
| logger.warning(f"Auto-detected label column: {label_col}") |
|
|
| |
| human_mask = df_shane[label_col] == 0 |
| ai_mask = df_shane[label_col] == 1 |
|
|
| human_texts = df_shane.loc[human_mask, text_col].dropna().head(max_samples_per_source).tolist() |
| ai_texts = df_shane.loc[ai_mask, text_col].dropna().head(max_samples_per_source).tolist() |
|
|
| self.texts.extend(human_texts) |
| self.labels.extend([1] * len(human_texts)) |
| self.texts.extend(ai_texts) |
| self.labels.extend([0] * len(ai_texts)) |
|
|
| logger.info(f"Shanegerami: {len(human_texts)} human + {len(ai_texts)} AI samples") |
| except Exception as e: |
| logger.warning(f"Failed to load Shanegerami dataset: {e}") |
|
|
| |
| logger.info(f"Loading Starblasters8 dataset from {starblasters_path}...") |
| try: |
| df_star = pd.read_parquet(starblasters_path) |
|
|
| |
| text_col = None |
| label_col = None |
| for col in df_star.columns: |
| col_lower = col.lower() |
| if col_lower in ("text", "essay_text", "content", "essay"): |
| text_col = col |
| elif col_lower in ("generated", "label", "is_ai", "ai_generated", "source"): |
| label_col = col |
|
|
| if text_col is None: |
| text_col = df_star.columns[0] |
| if label_col is None: |
| label_col = df_star.columns[-1] |
|
|
| human_mask = df_star[label_col] == 0 |
| ai_mask = df_star[label_col] == 1 |
|
|
| human_texts = df_star.loc[human_mask, text_col].dropna().head(max_samples_per_source).tolist() |
| ai_texts = df_star.loc[ai_mask, text_col].dropna().head(max_samples_per_source).tolist() |
|
|
| self.texts.extend(human_texts) |
| self.labels.extend([1] * len(human_texts)) |
| self.texts.extend(ai_texts) |
| self.labels.extend([0] * len(ai_texts)) |
|
|
| logger.info(f"Starblasters8: {len(human_texts)} human + {len(ai_texts)} AI samples") |
| except Exception as e: |
| logger.warning(f"Failed to load Starblasters8 dataset: {e}") |
|
|
| logger.info(f"Total dataset size: {len(self.texts)} samples") |
|
|
| |
| self._features = None |
| self._precomputed = False |
|
|
| def precompute_features(self): |
| """Pre-compute all features using optimised batched extraction.""" |
| if self._precomputed: |
| return |
|
|
| logger.info("Pre-computing features for all texts...") |
|
|
| |
| truncated_texts = [ |
| str(text)[:2000] if len(str(text)) > 2000 else str(text) |
| for text in self.texts |
| ] |
|
|
| |
| features_array = self.extractor.extract_batch( |
| truncated_texts, |
| batch_size=None, |
| num_workers=0, |
| progress_every=2000, |
| ) |
|
|
| |
| self._features = [features_array[i] for i in range(len(features_array))] |
| self._precomputed = True |
| logger.info("Feature pre-computation complete") |
|
|
| def __len__(self): |
| return len(self.texts) |
|
|
| def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]: |
| if self._precomputed and self._features is not None: |
| features = self._features[idx] |
| else: |
| text = str(self.texts[idx])[:2000] |
| features = self.extractor.extract(text) |
|
|
| features_tensor = torch.tensor(features, dtype=torch.float32) |
|
|
| |
| features_tensor = torch.nan_to_num(features_tensor, nan=0.0, posinf=10.0, neginf=-10.0) |
|
|
| return features_tensor, self.labels[idx] |
|
|
|
|
| class HumanPatternClassifier(nn.Module): |
| """ |
| Lightweight MLP trained to distinguish human from AI writing. |
| Input: feature vector from HumanPatternFeatureExtractor |
| Output: probability that text is human-written (0 to 1) |
| |
| PRE-TRAINED on Kaggle datasets, then FROZEN during main training. |
| """ |
|
|
| def __init__(self, input_dim: int = 17, hidden_dim: int = 128): |
| super().__init__() |
| self.net = nn.Sequential( |
| nn.Linear(input_dim, hidden_dim), |
| nn.BatchNorm1d(hidden_dim), |
| nn.ReLU(), |
| nn.Dropout(0.3), |
| nn.Linear(hidden_dim, hidden_dim // 2), |
| nn.BatchNorm1d(hidden_dim // 2), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(hidden_dim // 2, 1), |
| ) |
|
|
| def forward(self, features: torch.Tensor) -> torch.Tensor: |
| """Returns human-likeness score in [0, 1]. Higher = more human.""" |
| logits = self.net(features) |
| return torch.sigmoid(logits).squeeze(-1) |
|
|
| def score(self, text: str, extractor: HumanPatternFeatureExtractor) -> float: |
| """Convenience: score a single text string.""" |
| self.eval() |
| features = extractor.extract(text) |
| features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0) |
| features_tensor = torch.nan_to_num(features_tensor, nan=0.0, posinf=10.0, neginf=-10.0) |
| with torch.no_grad(): |
| score = self.forward(features_tensor) |
| return score.item() |
|
|