| | |
| | """ |
| | Script to process CTI-bench TSV files into Hugging Face datasets with comprehensive README documentation. |
| | """ |
| |
|
| | import pandas as pd |
| | import os |
| | from pathlib import Path |
| | from datasets import Dataset |
| | from huggingface_hub import HfApi, login |
| | import argparse |
| | import logging |
| | import tempfile |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | def generate_mcq_readme(dataset_size): |
| | """Generate README for Multiple Choice Questions dataset.""" |
| | return f"""# CTI-Bench: Multiple Choice Questions (MCQ) |
| | |
| | ## Dataset Description |
| | |
| | This dataset contains **{dataset_size:,} multiple choice questions** focused on cybersecurity knowledge, particularly based on the MITRE ATT&CK framework. It's part of the CTI-Bench suite for evaluating Large Language Models on Cyber Threat Intelligence tasks. |
| | |
| | ## Dataset Structure |
| | |
| | Each example contains: |
| | - **url**: Source URL (typically MITRE ATT&CK technique pages) |
| | - **question**: The cybersecurity question |
| | - **option_a**: First multiple choice option |
| | - **option_b**: Second multiple choice option |
| | - **option_c**: Third multiple choice option |
| | - **option_d**: Fourth multiple choice option |
| | - **prompt**: Full prompt with instructions for the model |
| | - **ground_truth**: Correct answer (A, B, C, or D) |
| | - **task_type**: Always "multiple_choice_question" |
| | |
| | ## Usage |
| | |
| | ```python |
| | from datasets import load_dataset |
| | |
| | # Load the dataset |
| | dataset = load_dataset("tuandunghcmut/cti_bench_mcq") |
| | |
| | # Access a sample |
| | sample = dataset['train'][0] |
| | print(f"Question: {{sample['question']}}") |
| | print(f"Options: A) {{sample['option_a']}}, B) {{sample['option_b']}}") |
| | print(f"Answer: {{sample['ground_truth']}}") |
| | ``` |
| | |
| | ## Example |
| | |
| | **Question:** Which of the following mitigations involves preventing applications from running that haven't been downloaded from legitimate repositories? |
| | |
| | **Options:** |
| | - A) Audit |
| | - B) Execution Prevention |
| | - C) Operating System Configuration |
| | - D) User Account Control |
| | |
| | **Answer:** B |
| | |
| | ## Citation |
| | |
| | If you use this dataset, please cite the original CTI-Bench paper: |
| | |
| | ```bibtex |
| | @article{{ctibench2024, |
| | title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| | author={{[Authors]}}, |
| | journal={{NeurIPS 2024}}, |
| | year={{2024}} |
| | }} |
| | ``` |
| | |
| | ## Original Source |
| | |
| | This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| | |
| | ## Tasks |
| | |
| | This dataset is designed for: |
| | - ✅ Multiple choice question answering |
| | - ✅ Cybersecurity knowledge evaluation |
| | - ✅ MITRE ATT&CK framework understanding |
| | - ✅ Model benchmarking on CTI tasks |
| | """ |
| |
|
| | def generate_ate_readme(dataset_size): |
| | """Generate README for Attack Technique Extraction dataset.""" |
| | return f"""# CTI-Bench: Attack Technique Extraction (ATE) |
| | |
| | ## Dataset Description |
| | |
| | This dataset contains **{dataset_size} examples** for extracting MITRE Enterprise attack technique IDs from malware and attack descriptions. It tests a model's ability to map cybersecurity descriptions to specific MITRE ATT&CK techniques. |
| | |
| | ## Dataset Structure |
| | |
| | Each example contains: |
| | - **url**: Source URL (typically MITRE software/malware pages) |
| | - **platform**: Target platform (Enterprise, Mobile, etc.) |
| | - **description**: Detailed description of the malware or attack technique |
| | - **prompt**: Full instruction prompt with MITRE technique reference list |
| | - **ground_truth**: Comma-separated list of main MITRE technique IDs (e.g., "T1071, T1573, T1083") |
| | - **task_type**: Always "attack_technique_extraction" |
| | |
| | ## Usage |
| | |
| | ```python |
| | from datasets import load_dataset |
| | |
| | # Load the dataset |
| | dataset = load_dataset("tuandunghcmut/cti_bench_ate") |
| | |
| | # Access a sample |
| | sample = dataset['train'][0] |
| | print(f"Description: {{sample['description']}}") |
| | print(f"MITRE Techniques: {{sample['ground_truth']}}") |
| | ``` |
| | |
| | ## Example |
| | |
| | **Description:** 3PARA RAT is a remote access tool (RAT) developed in C++ and associated with the group Putter Panda. It communicates with its command and control (C2) servers via HTTP, with commands encrypted using the DES algorithm in CBC mode... |
| | |
| | **Expected Output:** T1071, T1573, T1083, T1070 |
| | |
| | ## MITRE ATT&CK Techniques |
| | |
| | The dataset covers techniques such as: |
| | - **T1071**: Application Layer Protocol |
| | - **T1573**: Encrypted Channel |
| | - **T1083**: File and Directory Discovery |
| | - **T1105**: Ingress Tool Transfer |
| | - And many more... |
| | |
| | ## Citation |
| | |
| | ```bibtex |
| | @article{{ctibench2024, |
| | title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| | author={{[Authors]}}, |
| | journal={{NeurIPS 2024}}, |
| | year={{2024}} |
| | }} |
| | ``` |
| | |
| | ## Original Source |
| | |
| | This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| | |
| | ## Tasks |
| | |
| | This dataset is designed for: |
| | - ✅ Named entity recognition (MITRE technique IDs) |
| | - ✅ Information extraction from cybersecurity text |
| | - ✅ MITRE ATT&CK framework mapping |
| | - ✅ Threat intelligence analysis |
| | """ |
| |
|
| | def generate_vsp_readme(dataset_size): |
| | """Generate README for Vulnerability Severity Prediction dataset.""" |
| | return f"""# CTI-Bench: Vulnerability Severity Prediction (VSP) |
| | |
| | ## Dataset Description |
| | |
| | This dataset contains **{dataset_size:,} CVE descriptions** with corresponding CVSS v3.1 base scores. It evaluates a model's ability to assess vulnerability severity and generate proper CVSS vector strings. |
| | |
| | ## Dataset Structure |
| | |
| | Each example contains: |
| | - **url**: CVE URL (typically from nvd.nist.gov) |
| | - **description**: CVE description detailing the vulnerability |
| | - **prompt**: Full instruction prompt explaining CVSS v3.1 metrics |
| | - **cvss_vector**: Ground truth CVSS v3.1 vector string (e.g., "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") |
| | - **task_type**: Always "vulnerability_severity_prediction" |
| | |
| | ## CVSS v3.1 Metrics |
| | |
| | The dataset covers all base metrics: |
| | - **AV** (Attack Vector): Network (N), Adjacent (A), Local (L), Physical (P) |
| | - **AC** (Attack Complexity): Low (L), High (H) |
| | - **PR** (Privileges Required): None (N), Low (L), High (H) |
| | - **UI** (User Interaction): None (N), Required (R) |
| | - **S** (Scope): Unchanged (U), Changed (C) |
| | - **C** (Confidentiality): None (N), Low (L), High (H) |
| | - **I** (Integrity): None (N), Low (L), High (H) |
| | - **A** (Availability): None (N), Low (L), High (H) |
| | |
| | ## Usage |
| | |
| | ```python |
| | from datasets import load_dataset |
| | |
| | # Load the dataset |
| | dataset = load_dataset("tuandunghcmut/cti_bench_vsp") |
| | |
| | # Access a sample |
| | sample = dataset['train'][0] |
| | print(f"CVE: {{sample['description']}}") |
| | print(f"CVSS Vector: {{sample['cvss_vector']}}") |
| | ``` |
| | |
| | ## Example |
| | |
| | **CVE Description:** In the Linux kernel through 6.7.1, there is a use-after-free in cec_queue_msg_fh, related to drivers/media/cec/core/cec-adap.c... |
| | |
| | **CVSS Vector:** CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:N/I:N/A:H |
| | |
| | ## Citation |
| | |
| | ```bibtex |
| | @article{{ctibench2024, |
| | title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| | author={{[Authors]}}, |
| | journal={{NeurIPS 2024}}, |
| | year={{2024}} |
| | }} |
| | ``` |
| | |
| | ## Original Source |
| | |
| | This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| | |
| | ## Tasks |
| | |
| | This dataset is designed for: |
| | - ✅ Vulnerability severity assessment |
| | - ✅ CVSS score calculation |
| | - ✅ Risk analysis and prioritization |
| | - ✅ Cybersecurity impact evaluation |
| | """ |
| |
|
| | def generate_taa_readme(dataset_size): |
| | """Generate README for Threat Actor Attribution dataset.""" |
| | return f"""# CTI-Bench: Threat Actor Attribution (TAA) |
| | |
| | ## Dataset Description |
| | |
| | This dataset contains **{dataset_size} examples** for threat actor attribution tasks. It evaluates a model's ability to identify and attribute cyber attacks to specific threat actors based on attack patterns, techniques, and indicators. |
| | |
| | ## Dataset Structure |
| | |
| | Each example contains: |
| | - **task_type**: Always "threat_actor_attribution" |
| | - Additional fields vary based on the specific attribution task |
| | - Common fields include threat descriptions, attack patterns, and attribution targets |
| | |
| | ## Usage |
| | |
| | ```python |
| | from datasets import load_dataset |
| | |
| | # Load the dataset |
| | dataset = load_dataset("tuandunghcmut/cti_bench_taa") |
| | |
| | # Access a sample |
| | sample = dataset['train'][0] |
| | print(f"Task: {{sample['task_type']}}") |
| | ``` |
| | |
| | ## Attribution Categories |
| | |
| | The dataset may cover attribution to: |
| | - **APT Groups**: Advanced Persistent Threat organizations |
| | - **Nation-State Actors**: Government-sponsored cyber units |
| | - **Cybercriminal Organizations**: Profit-motivated threat groups |
| | - **Hacktivist Groups**: Ideologically motivated actors |
| | |
| | ## Citation |
| | |
| | ```bibtex |
| | @article{{ctibench2024, |
| | title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| | author={{[Authors]}}, |
| | journal={{NeurIPS 2024}}, |
| | year={{2024}} |
| | }} |
| | ``` |
| | |
| | ## Original Source |
| | |
| | This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| | |
| | ## Tasks |
| | |
| | This dataset is designed for: |
| | - ✅ Threat actor identification |
| | - ✅ Attribution analysis |
| | - ✅ Attack pattern recognition |
| | - ✅ Intelligence correlation |
| | """ |
| |
|
| | def generate_rcm_readme(dataset_size, variant=""): |
| | """Generate README for Reverse Cyber Mapping dataset.""" |
| | variant_text = f" ({variant})" if variant else "" |
| | return f"""# CTI-Bench: Reverse Cyber Mapping (RCM){variant_text} |
| | |
| | ## Dataset Description |
| | |
| | This dataset contains **{dataset_size:,} examples** for reverse cyber mapping tasks. It evaluates a model's ability to work backwards from observed indicators or effects to identify the underlying attack techniques, tools, or threat actors. |
| | |
| | ## Dataset Structure |
| | |
| | Each example contains: |
| | - **task_type**: Always "reverse_cyber_mapping" |
| | - Additional fields vary based on the specific mapping task |
| | - Common fields include indicators, observables, and mapping targets |
| | |
| | ## Usage |
| | |
| | ```python |
| | from datasets import load_dataset |
| | |
| | # Load the dataset |
| | dataset = load_dataset("tuandunghcmut/cti_bench_rcm{'_2021' if '2021' in variant else ''}") |
| | |
| | # Access a sample |
| | sample = dataset['train'][0] |
| | print(f"Task: {{sample['task_type']}}") |
| | ``` |
| | |
| | ## Reverse Mapping Categories |
| | |
| | The dataset may include mapping from: |
| | - **Indicators of Compromise (IoCs)** → Attack techniques |
| | - **Network signatures** → Malware families |
| | - **Attack patterns** → Threat actors |
| | - **Behavioral analysis** → MITRE techniques |
| | |
| | ## Citation |
| | |
| | ```bibtex |
| | @article{{ctibench2024, |
| | title={{CTIBench: A Benchmark for Evaluating LLMs in Cyber Threat Intelligence}}, |
| | author={{[Authors]}}, |
| | journal={{NeurIPS 2024}}, |
| | year={{2024}} |
| | }} |
| | ``` |
| | |
| | ## Original Source |
| | |
| | This dataset is derived from [CTI-Bench](https://github.com/xashru/cti-bench) and is available under the same license terms. |
| | |
| | ## Tasks |
| | |
| | This dataset is designed for: |
| | - ✅ Reverse engineering of attack chains |
| | - ✅ Indicator-to-technique mapping |
| | - ✅ Threat hunting and investigation |
| | - ✅ Forensic analysis |
| | """ |
| |
|
| | def process_mcq_dataset(file_path): |
| | """Process Multiple Choice Questions dataset.""" |
| | logger.info(f"Processing MCQ dataset: {file_path}") |
| | |
| | df = pd.read_csv(file_path, sep='\t') |
| | |
| | |
| | processed_data = [] |
| | for _, row in df.iterrows(): |
| | processed_data.append({ |
| | 'url': str(row['URL']) if pd.notna(row['URL']) else '', |
| | 'question': str(row['Question']) if pd.notna(row['Question']) else '', |
| | 'option_a': str(row['Option A']) if pd.notna(row['Option A']) else '', |
| | 'option_b': str(row['Option B']) if pd.notna(row['Option B']) else '', |
| | 'option_c': str(row['Option C']) if pd.notna(row['Option C']) else '', |
| | 'option_d': str(row['Option D']) if pd.notna(row['Option D']) else '', |
| | 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', |
| | 'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '', |
| | 'task_type': 'multiple_choice_question' |
| | }) |
| | |
| | return Dataset.from_list(processed_data) |
| |
|
| | def process_ate_dataset(file_path): |
| | """Process Attack Technique Extraction dataset.""" |
| | logger.info(f"Processing ATE dataset: {file_path}") |
| | |
| | df = pd.read_csv(file_path, sep='\t') |
| | |
| | processed_data = [] |
| | for _, row in df.iterrows(): |
| | processed_data.append({ |
| | 'url': str(row['URL']) if pd.notna(row['URL']) else '', |
| | 'platform': str(row['Platform']) if pd.notna(row['Platform']) else '', |
| | 'description': str(row['Description']) if pd.notna(row['Description']) else '', |
| | 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', |
| | 'ground_truth': str(row['GT']) if pd.notna(row['GT']) else '', |
| | 'task_type': 'attack_technique_extraction' |
| | }) |
| | |
| | return Dataset.from_list(processed_data) |
| |
|
| | def process_vsp_dataset(file_path): |
| | """Process Vulnerability Severity Prediction dataset.""" |
| | logger.info(f"Processing VSP dataset: {file_path}") |
| | |
| | df = pd.read_csv(file_path, sep='\t') |
| | |
| | processed_data = [] |
| | for _, row in df.iterrows(): |
| | processed_data.append({ |
| | 'url': str(row['URL']) if pd.notna(row['URL']) else '', |
| | 'description': str(row['Description']) if pd.notna(row['Description']) else '', |
| | 'prompt': str(row['Prompt']) if pd.notna(row['Prompt']) else '', |
| | 'cvss_vector': str(row['GT']) if pd.notna(row['GT']) else '', |
| | 'task_type': 'vulnerability_severity_prediction' |
| | }) |
| | |
| | return Dataset.from_list(processed_data) |
| |
|
| | def process_taa_dataset(file_path): |
| | """Process Threat Actor Attribution dataset.""" |
| | logger.info(f"Processing TAA dataset: {file_path}") |
| | |
| | |
| | chunk_list = [] |
| | chunk_size = 10000 |
| | |
| | for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size): |
| | chunk_list.append(chunk) |
| | |
| | df = pd.concat(chunk_list, ignore_index=True) |
| | |
| | processed_data = [] |
| | for _, row in df.iterrows(): |
| | |
| | data_entry = {'task_type': 'threat_actor_attribution'} |
| | |
| | |
| | for col in df.columns: |
| | col_lower = col.lower() |
| | if 'url' in col_lower: |
| | data_entry['url'] = str(row[col]) if pd.notna(row[col]) else '' |
| | elif 'description' in col_lower or 'text' in col_lower: |
| | data_entry['description'] = str(row[col]) if pd.notna(row[col]) else '' |
| | elif 'prompt' in col_lower: |
| | data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else '' |
| | elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower: |
| | data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else '' |
| | else: |
| | |
| | data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else '' |
| | |
| | processed_data.append(data_entry) |
| | |
| | return Dataset.from_list(processed_data) |
| |
|
| | def process_rcm_dataset(file_path): |
| | """Process Reverse Cyber Mapping dataset.""" |
| | logger.info(f"Processing RCM dataset: {file_path}") |
| | |
| | |
| | chunk_list = [] |
| | chunk_size = 10000 |
| | |
| | for chunk in pd.read_csv(file_path, sep='\t', chunksize=chunk_size): |
| | chunk_list.append(chunk) |
| | |
| | df = pd.concat(chunk_list, ignore_index=True) |
| | |
| | processed_data = [] |
| | for _, row in df.iterrows(): |
| | data_entry = {'task_type': 'reverse_cyber_mapping'} |
| | |
| | |
| | for col in df.columns: |
| | col_lower = col.lower() |
| | if 'url' in col_lower: |
| | data_entry['url'] = str(row[col]) if pd.notna(row[col]) else '' |
| | elif 'description' in col_lower or 'text' in col_lower: |
| | data_entry['description'] = str(row[col]) if pd.notna(row[col]) else '' |
| | elif 'prompt' in col_lower: |
| | data_entry['prompt'] = str(row[col]) if pd.notna(row[col]) else '' |
| | elif col == 'GT' or 'ground' in col_lower or 'truth' in col_lower: |
| | data_entry['ground_truth'] = str(row[col]) if pd.notna(row[col]) else '' |
| | else: |
| | data_entry[col.lower().replace(' ', '_')] = str(row[col]) if pd.notna(row[col]) else '' |
| | |
| | processed_data.append(data_entry) |
| | |
| | return Dataset.from_list(processed_data) |
| |
|
| | def upload_dataset_to_hub_with_readme(dataset, dataset_name, username, readme_content, token=None): |
| | """Upload dataset to Hugging Face Hub with README.""" |
| | try: |
| | logger.info(f"Uploading {dataset_name} to Hugging Face Hub...") |
| | |
| | |
| | dataset.push_to_hub( |
| | repo_id=f"{username}/{dataset_name}", |
| | token=token, |
| | private=False |
| | ) |
| | |
| | |
| | api = HfApi() |
| | |
| | |
| | with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: |
| | f.write(readme_content) |
| | readme_path = f.name |
| | |
| | try: |
| | |
| | api.upload_file( |
| | path_or_fileobj=readme_path, |
| | path_in_repo="README.md", |
| | repo_id=f"{username}/{dataset_name}", |
| | repo_type="dataset", |
| | token=token |
| | ) |
| | finally: |
| | |
| | os.unlink(readme_path) |
| | |
| | logger.info(f"Successfully uploaded {dataset_name} with documentation to {username}/{dataset_name}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Error uploading {dataset_name}: {str(e)}") |
| | return False |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser(description='Process CTI-bench TSV files and upload to Hugging Face Hub with documentation') |
| | parser.add_argument('--username', default='tuandunghcmut', help='Hugging Face username') |
| | parser.add_argument('--token', help='Hugging Face token (optional if logged in via CLI)') |
| | parser.add_argument('--data-dir', default='cti-bench/data', help='Directory containing TSV files') |
| | |
| | args = parser.parse_args() |
| | |
| | data_dir = Path(args.data_dir) |
| | |
| | |
| | file_processors = { |
| | 'cti-mcq.tsv': ('cti_bench_mcq', process_mcq_dataset, generate_mcq_readme), |
| | 'cti-ate.tsv': ('cti_bench_ate', process_ate_dataset, generate_ate_readme), |
| | 'cti-vsp.tsv': ('cti_bench_vsp', process_vsp_dataset, generate_vsp_readme), |
| | 'cti-taa.tsv': ('cti_bench_taa', process_taa_dataset, generate_taa_readme), |
| | 'cti-rcm.tsv': ('cti_bench_rcm', process_rcm_dataset, lambda size: generate_rcm_readme(size)), |
| | 'cti-rcm-2021.tsv': ('cti_bench_rcm_2021', process_rcm_dataset, lambda size: generate_rcm_readme(size, "2021")), |
| | } |
| | |
| | successful_uploads = [] |
| | failed_uploads = [] |
| | |
| | |
| | for filename, (dataset_name, processor_func, readme_generator) in file_processors.items(): |
| | file_path = data_dir / filename |
| | |
| | if not file_path.exists(): |
| | logger.warning(f"File not found: {file_path}") |
| | failed_uploads.append(filename) |
| | continue |
| | |
| | try: |
| | logger.info(f"Processing {filename}...") |
| | |
| | |
| | dataset = processor_func(file_path) |
| | dataset_size = len(dataset) |
| | logger.info(f"Created dataset with {dataset_size:,} entries") |
| | |
| | |
| | readme_content = readme_generator(dataset_size) |
| | |
| | |
| | success = upload_dataset_to_hub_with_readme( |
| | dataset, dataset_name, args.username, readme_content, args.token |
| | ) |
| | |
| | if success: |
| | successful_uploads.append(dataset_name) |
| | logger.info(f"✅ Successfully processed and uploaded: {dataset_name}") |
| | else: |
| | failed_uploads.append(filename) |
| | logger.error(f"❌ Failed to upload: {dataset_name}") |
| | |
| | except Exception as e: |
| | logger.error(f"❌ Error processing {filename}: {str(e)}") |
| | failed_uploads.append(filename) |
| | |
| | |
| | logger.info(f"\n🎉 Processing complete!") |
| | logger.info(f"✅ Successfully uploaded {len(successful_uploads)} datasets with documentation:") |
| | for name in successful_uploads: |
| | logger.info(f" - https://huggingface.co/datasets/{args.username}/{name}") |
| | |
| | if failed_uploads: |
| | logger.info(f"❌ Failed to process {len(failed_uploads)} files:") |
| | for name in failed_uploads: |
| | logger.info(f" - {name}") |
| | |
| | logger.info(f"\nVisit https://huggingface.co/{args.username} to see your uploaded datasets with full documentation!") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|