|
""" |
|
Research Collaboration Framework for Cyber-LLM |
|
Enables secure sharing of cybersecurity insights and collaborative research across organizations. |
|
|
|
Author: Muzan Sano <sanosensei36@gmail.com> |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
from datetime import datetime, timedelta |
|
from typing import Dict, List, Optional, Tuple, Any, Set, Union, Callable |
|
from dataclasses import dataclass, asdict, field |
|
from enum import Enum |
|
from abc import ABC, abstractmethod |
|
import hashlib |
|
import hmac |
|
import base64 |
|
from cryptography.hazmat.primitives import hashes, serialization |
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding |
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
|
from cryptography.hazmat.backends import default_backend |
|
import redis |
|
import yaml |
|
from pathlib import Path |
|
import uuid |
|
|
|
from ..utils.logging_system import CyberLLMLogger |
|
from .online_learning import LearningEvent, LearningEventType |
|
|
|
|
|
logger = CyberLLMLogger(__name__).get_logger() |
|
|
|
class CollaborationType(Enum): |
|
"""Types of research collaboration""" |
|
THREAT_INTELLIGENCE_SHARING = "threat_intelligence_sharing" |
|
ATTACK_PATTERN_ANALYSIS = "attack_pattern_analysis" |
|
DEFENSE_STRATEGY_DEVELOPMENT = "defense_strategy_development" |
|
VULNERABILITY_RESEARCH = "vulnerability_research" |
|
INCIDENT_CASE_STUDIES = "incident_case_studies" |
|
TOOL_BENCHMARKING = "tool_benchmarking" |
|
DATASET_SHARING = "dataset_sharing" |
|
|
|
class ParticipantRole(Enum): |
|
"""Roles in research collaboration""" |
|
COORDINATOR = "coordinator" |
|
CONTRIBUTOR = "contributor" |
|
VALIDATOR = "validator" |
|
OBSERVER = "observer" |
|
ANALYST = "analyst" |
|
|
|
class SensitivityLevel(Enum): |
|
"""Data sensitivity levels for sharing""" |
|
PUBLIC = "public" |
|
CONSORTIUM = "consortium" |
|
BILATERAL = "bilateral" |
|
INTERNAL = "internal" |
|
CLASSIFIED = "classified" |
|
|
|
@dataclass |
|
class ResearchInsight: |
|
"""Structure for research insights""" |
|
insight_id: str |
|
title: str |
|
description: str |
|
collaboration_type: CollaborationType |
|
sensitivity_level: SensitivityLevel |
|
|
|
|
|
findings: Dict[str, Any] |
|
evidence: List[Dict[str, Any]] |
|
methodology: Dict[str, Any] |
|
|
|
|
|
contributor_org: str |
|
contributors: List[str] |
|
created_at: datetime |
|
updated_at: datetime |
|
version: str |
|
|
|
|
|
validation_status: str = "pending" |
|
validators: List[str] = field(default_factory=list) |
|
validation_feedback: List[Dict[str, Any]] = field(default_factory=list) |
|
|
|
|
|
anonymized: bool = False |
|
data_retention_days: Optional[int] = None |
|
access_log: List[Dict[str, Any]] = field(default_factory=list) |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert to dictionary for serialization""" |
|
return { |
|
'insight_id': self.insight_id, |
|
'title': self.title, |
|
'description': self.description, |
|
'collaboration_type': self.collaboration_type.value, |
|
'sensitivity_level': self.sensitivity_level.value, |
|
'findings': self.findings, |
|
'evidence': self.evidence, |
|
'methodology': self.methodology, |
|
'contributor_org': self.contributor_org, |
|
'contributors': self.contributors, |
|
'created_at': self.created_at.isoformat(), |
|
'updated_at': self.updated_at.isoformat(), |
|
'version': self.version, |
|
'validation_status': self.validation_status, |
|
'validators': self.validators, |
|
'validation_feedback': self.validation_feedback, |
|
'anonymized': self.anonymized, |
|
'data_retention_days': self.data_retention_days, |
|
'access_log': self.access_log |
|
} |
|
|
|
@dataclass |
|
class CollaborationParticipant: |
|
"""Research collaboration participant""" |
|
participant_id: str |
|
organization: str |
|
name: str |
|
email: str |
|
role: ParticipantRole |
|
public_key: str |
|
|
|
|
|
expertise_areas: List[str] |
|
research_interests: List[CollaborationType] |
|
data_sharing_policy: Dict[str, Any] |
|
|
|
|
|
status: str = "active" |
|
joined_at: datetime = field(default_factory=datetime.now) |
|
last_active: Optional[datetime] = None |
|
|
|
|
|
contributions_count: int = 0 |
|
validations_count: int = 0 |
|
reputation_score: float = 0.0 |
|
|
|
@dataclass |
|
class CollaborationProject: |
|
"""Research collaboration project""" |
|
project_id: str |
|
name: str |
|
description: str |
|
collaboration_type: CollaborationType |
|
|
|
|
|
coordinator: str |
|
participants: List[str] |
|
created_at: datetime |
|
deadline: Optional[datetime] |
|
|
|
|
|
sensitivity_level: SensitivityLevel |
|
data_sharing_rules: Dict[str, Any] |
|
validation_requirements: Dict[str, Any] |
|
|
|
|
|
status: str = "active" |
|
progress: float = 0.0 |
|
|
|
|
|
insights: List[str] = field(default_factory=list) |
|
deliverables: List[Dict[str, Any]] = field(default_factory=list) |
|
|
|
class SecureCollaborationProtocol: |
|
"""Secure communication protocol for research collaboration""" |
|
|
|
def __init__(self, private_key_path: str, public_key_path: str): |
|
self.private_key = self._load_private_key(private_key_path) |
|
self.public_key = self._load_public_key(public_key_path) |
|
|
|
|
|
self.participant_keys: Dict[str, Any] = {} |
|
|
|
def _load_private_key(self, key_path: str): |
|
"""Load private key from file""" |
|
try: |
|
with open(key_path, 'rb') as f: |
|
return serialization.load_pem_private_key( |
|
f.read(), password=None, backend=default_backend() |
|
) |
|
except FileNotFoundError: |
|
logger.warning(f"Private key not found at {key_path}, generating new key") |
|
return self._generate_key_pair(key_path) |
|
|
|
def _load_public_key(self, key_path: str): |
|
"""Load public key from file""" |
|
try: |
|
with open(key_path, 'rb') as f: |
|
return serialization.load_pem_public_key( |
|
f.read(), backend=default_backend() |
|
) |
|
except FileNotFoundError: |
|
return self.private_key.public_key() |
|
|
|
def _generate_key_pair(self, private_key_path: str): |
|
"""Generate new RSA key pair""" |
|
private_key = rsa.generate_private_key( |
|
public_exponent=65537, |
|
key_size=2048, |
|
backend=default_backend() |
|
) |
|
|
|
|
|
private_pem = private_key.private_bytes( |
|
encoding=serialization.Encoding.PEM, |
|
format=serialization.PrivateFormat.PKCS8, |
|
encryption_algorithm=serialization.NoEncryption() |
|
) |
|
|
|
with open(private_key_path, 'wb') as f: |
|
f.write(private_pem) |
|
|
|
|
|
public_key = private_key.public_key() |
|
public_pem = public_key.public_bytes( |
|
encoding=serialization.Encoding.PEM, |
|
format=serialization.PublicFormat.SubjectPublicKeyInfo |
|
) |
|
|
|
public_key_path = private_key_path.replace('private', 'public') |
|
with open(public_key_path, 'wb') as f: |
|
f.write(public_pem) |
|
|
|
logger.info(f"Generated new key pair: {private_key_path}") |
|
return private_key |
|
|
|
def encrypt_data(self, data: Dict[str, Any], recipient_public_key: str) -> str: |
|
"""Encrypt data for specific recipient""" |
|
try: |
|
|
|
data_json = json.dumps(data, default=str).encode('utf-8') |
|
|
|
|
|
recipient_key = serialization.load_pem_public_key( |
|
recipient_public_key.encode(), backend=default_backend() |
|
) |
|
|
|
|
|
encrypted_data = recipient_key.encrypt( |
|
data_json, |
|
padding.OAEP( |
|
mgf=padding.MGF1(algorithm=hashes.SHA256()), |
|
algorithm=hashes.SHA256(), |
|
label=None |
|
) |
|
) |
|
|
|
|
|
signature = self.private_key.sign( |
|
encrypted_data, |
|
padding.PSS( |
|
mgf=padding.MGF1(hashes.SHA256()), |
|
salt_length=padding.PSS.MAX_LENGTH |
|
), |
|
hashes.SHA256() |
|
) |
|
|
|
|
|
payload = { |
|
'encrypted_data': base64.b64encode(encrypted_data).decode(), |
|
'signature': base64.b64encode(signature).decode(), |
|
'timestamp': datetime.now().isoformat() |
|
} |
|
|
|
return base64.b64encode(json.dumps(payload).encode()).decode() |
|
|
|
except Exception as e: |
|
logger.error(f"Encryption failed: {str(e)}") |
|
raise |
|
|
|
def decrypt_data(self, encrypted_payload: str, sender_public_key: str) -> Dict[str, Any]: |
|
"""Decrypt data from sender""" |
|
try: |
|
|
|
payload = json.loads(base64.b64decode(encrypted_payload).decode()) |
|
encrypted_data = base64.b64decode(payload['encrypted_data']) |
|
signature = base64.b64decode(payload['signature']) |
|
|
|
|
|
sender_key = serialization.load_pem_public_key( |
|
sender_public_key.encode(), backend=default_backend() |
|
) |
|
|
|
|
|
sender_key.verify( |
|
signature, |
|
encrypted_data, |
|
padding.PSS( |
|
mgf=padding.MGF1(hashes.SHA256()), |
|
salt_length=padding.PSS.MAX_LENGTH |
|
), |
|
hashes.SHA256() |
|
) |
|
|
|
|
|
decrypted_data = self.private_key.decrypt( |
|
encrypted_data, |
|
padding.OAEP( |
|
mgf=padding.MGF1(algorithm=hashes.SHA256()), |
|
algorithm=hashes.SHA256(), |
|
label=None |
|
) |
|
) |
|
|
|
return json.loads(decrypted_data.decode('utf-8')) |
|
|
|
except Exception as e: |
|
logger.error(f"Decryption failed: {str(e)}") |
|
raise |
|
|
|
def register_participant_key(self, participant_id: str, public_key: str): |
|
"""Register participant's public key""" |
|
self.participant_keys[participant_id] = public_key |
|
logger.info(f"Registered public key for participant: {participant_id}") |
|
|
|
class PrivacyPreservingAnalytics: |
|
"""Privacy-preserving analytics for collaborative research""" |
|
|
|
def __init__(self): |
|
self.anonymization_functions = { |
|
'k_anonymity': self._apply_k_anonymity, |
|
'differential_privacy': self._apply_differential_privacy, |
|
'homomorphic': self._apply_homomorphic_encryption |
|
} |
|
|
|
def anonymize_insight(self, insight: ResearchInsight, method: str = 'k_anonymity') -> ResearchInsight: |
|
"""Anonymize research insight""" |
|
|
|
if method not in self.anonymization_functions: |
|
raise ValueError(f"Unsupported anonymization method: {method}") |
|
|
|
try: |
|
anonymized_insight = self.anonymization_functions[method](insight) |
|
anonymized_insight.anonymized = True |
|
|
|
logger.info(f"Applied {method} anonymization to insight: {insight.insight_id}") |
|
return anonymized_insight |
|
|
|
except Exception as e: |
|
logger.error(f"Anonymization failed: {str(e)}") |
|
raise |
|
|
|
def _apply_k_anonymity(self, insight: ResearchInsight, k: int = 5) -> ResearchInsight: |
|
"""Apply k-anonymity to insight""" |
|
anonymized_insight = insight |
|
|
|
|
|
anonymized_insight.contributor_org = f"Organization_{hash(insight.contributor_org) % 1000}" |
|
anonymized_insight.contributors = [f"Researcher_{i}" for i in range(len(insight.contributors))] |
|
|
|
|
|
if 'ip_addresses' in insight.findings: |
|
ips = insight.findings['ip_addresses'] |
|
anonymized_insight.findings['ip_addresses'] = [ |
|
'.'.join(ip.split('.')[:2] + ['x', 'x']) for ip in ips |
|
] |
|
|
|
if 'timestamps' in insight.findings: |
|
timestamps = insight.findings['timestamps'] |
|
anonymized_insight.findings['timestamps'] = [ |
|
ts[:10] for ts in timestamps |
|
] |
|
|
|
return anonymized_insight |
|
|
|
def _apply_differential_privacy(self, insight: ResearchInsight, epsilon: float = 1.0) -> ResearchInsight: |
|
"""Apply differential privacy to insight""" |
|
import numpy as np |
|
|
|
anonymized_insight = insight |
|
|
|
|
|
for key, value in insight.findings.items(): |
|
if isinstance(value, (int, float)): |
|
|
|
sensitivity = 1.0 |
|
scale = sensitivity / epsilon |
|
noise = np.random.laplace(0, scale) |
|
anonymized_insight.findings[key] = max(0, value + noise) |
|
|
|
return anonymized_insight |
|
|
|
def _apply_homomorphic_encryption(self, insight: ResearchInsight) -> ResearchInsight: |
|
"""Apply homomorphic encryption to insight""" |
|
|
|
|
|
|
|
anonymized_insight = insight |
|
|
|
|
|
for key, value in insight.findings.items(): |
|
if isinstance(value, (int, float)): |
|
|
|
encrypted_value = f"HE_encrypted_{hash(str(value)) % 10000}" |
|
anonymized_insight.findings[key] = encrypted_value |
|
|
|
return anonymized_insight |
|
|
|
def compute_privacy_risk_score(self, insight: ResearchInsight) -> float: |
|
"""Compute privacy risk score for insight""" |
|
|
|
risk_score = 0.0 |
|
|
|
|
|
if not insight.anonymized: |
|
risk_score += 0.3 |
|
|
|
|
|
sensitivity_risk = { |
|
SensitivityLevel.PUBLIC: 0.0, |
|
SensitivityLevel.CONSORTIUM: 0.1, |
|
SensitivityLevel.BILATERAL: 0.2, |
|
SensitivityLevel.INTERNAL: 0.4, |
|
SensitivityLevel.CLASSIFIED: 0.8 |
|
} |
|
risk_score += sensitivity_risk.get(insight.sensitivity_level, 0.5) |
|
|
|
|
|
pii_indicators = ['email', 'ip', 'username', 'id', 'address'] |
|
for indicator in pii_indicators: |
|
if any(indicator in str(value).lower() for value in insight.findings.values()): |
|
risk_score += 0.1 |
|
|
|
|
|
if insight.data_retention_days is None: |
|
risk_score += 0.1 |
|
|
|
return min(1.0, risk_score) |
|
|
|
class CollaborationRepository: |
|
"""Repository for managing collaboration data""" |
|
|
|
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379): |
|
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True) |
|
|
|
|
|
self.participants: Dict[str, CollaborationParticipant] = {} |
|
self.projects: Dict[str, CollaborationProject] = {} |
|
self.insights: Dict[str, ResearchInsight] = {} |
|
|
|
|
|
self._load_data() |
|
|
|
def _load_data(self): |
|
"""Load existing data from Redis""" |
|
try: |
|
|
|
participant_ids = self.redis_client.smembers("collaboration:participants") |
|
for pid in participant_ids: |
|
data = self.redis_client.hget("collaboration:participant", pid) |
|
if data: |
|
self.participants[pid] = CollaborationParticipant(**json.loads(data)) |
|
|
|
|
|
project_ids = self.redis_client.smembers("collaboration:projects") |
|
for proj_id in project_ids: |
|
data = self.redis_client.hget("collaboration:project", proj_id) |
|
if data: |
|
self.projects[proj_id] = CollaborationProject(**json.loads(data)) |
|
|
|
|
|
insight_ids = self.redis_client.smembers("collaboration:insights") |
|
for insight_id in insight_ids: |
|
data = self.redis_client.hget("collaboration:insight", insight_id) |
|
if data: |
|
self.insights[insight_id] = ResearchInsight(**json.loads(data)) |
|
|
|
logger.info(f"Loaded {len(self.participants)} participants, " |
|
f"{len(self.projects)} projects, {len(self.insights)} insights") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load data from Redis: {str(e)}") |
|
|
|
def save_participant(self, participant: CollaborationParticipant): |
|
"""Save participant to repository""" |
|
try: |
|
self.participants[participant.participant_id] = participant |
|
|
|
|
|
self.redis_client.sadd("collaboration:participants", participant.participant_id) |
|
self.redis_client.hset( |
|
"collaboration:participant", |
|
participant.participant_id, |
|
json.dumps(asdict(participant), default=str) |
|
) |
|
|
|
logger.info(f"Saved participant: {participant.participant_id}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save participant: {str(e)}") |
|
raise |
|
|
|
def save_project(self, project: CollaborationProject): |
|
"""Save project to repository""" |
|
try: |
|
self.projects[project.project_id] = project |
|
|
|
|
|
self.redis_client.sadd("collaboration:projects", project.project_id) |
|
self.redis_client.hset( |
|
"collaboration:project", |
|
project.project_id, |
|
json.dumps(asdict(project), default=str) |
|
) |
|
|
|
logger.info(f"Saved project: {project.project_id}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save project: {str(e)}") |
|
raise |
|
|
|
def save_insight(self, insight: ResearchInsight): |
|
"""Save insight to repository""" |
|
try: |
|
self.insights[insight.insight_id] = insight |
|
|
|
|
|
self.redis_client.sadd("collaboration:insights", insight.insight_id) |
|
self.redis_client.hset( |
|
"collaboration:insight", |
|
insight.insight_id, |
|
json.dumps(insight.to_dict()) |
|
) |
|
|
|
|
|
access_entry = { |
|
'action': 'save', |
|
'timestamp': datetime.now().isoformat(), |
|
'user': 'system' |
|
} |
|
insight.access_log.append(access_entry) |
|
|
|
logger.info(f"Saved insight: {insight.insight_id}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save insight: {str(e)}") |
|
raise |
|
|
|
def get_participant(self, participant_id: str) -> Optional[CollaborationParticipant]: |
|
"""Get participant by ID""" |
|
return self.participants.get(participant_id) |
|
|
|
def get_project(self, project_id: str) -> Optional[CollaborationProject]: |
|
"""Get project by ID""" |
|
return self.projects.get(project_id) |
|
|
|
def get_insight(self, insight_id: str) -> Optional[ResearchInsight]: |
|
"""Get insight by ID""" |
|
insight = self.insights.get(insight_id) |
|
|
|
if insight: |
|
|
|
access_entry = { |
|
'action': 'access', |
|
'timestamp': datetime.now().isoformat(), |
|
'user': 'system' |
|
} |
|
insight.access_log.append(access_entry) |
|
|
|
return insight |
|
|
|
def search_insights(self, |
|
collaboration_type: Optional[CollaborationType] = None, |
|
sensitivity_level: Optional[SensitivityLevel] = None, |
|
contributor_org: Optional[str] = None) -> List[ResearchInsight]: |
|
"""Search insights by criteria""" |
|
|
|
results = [] |
|
|
|
for insight in self.insights.values(): |
|
if (collaboration_type is None or insight.collaboration_type == collaboration_type) and \ |
|
(sensitivity_level is None or insight.sensitivity_level == sensitivity_level) and \ |
|
(contributor_org is None or insight.contributor_org == contributor_org): |
|
results.append(insight) |
|
|
|
return results |
|
|
|
class ResearchCollaborationManager: |
|
"""Main manager for research collaboration""" |
|
|
|
def __init__(self, |
|
organization_name: str, |
|
private_key_path: str = "keys/collaboration_private.pem", |
|
public_key_path: str = "keys/collaboration_public.pem"): |
|
|
|
self.organization_name = organization_name |
|
|
|
|
|
self.security_protocol = SecureCollaborationProtocol(private_key_path, public_key_path) |
|
self.privacy_analytics = PrivacyPreservingAnalytics() |
|
self.repository = CollaborationRepository() |
|
|
|
|
|
self.collaboration_config = self._load_collaboration_config() |
|
|
|
logger.info(f"ResearchCollaborationManager initialized for: {organization_name}") |
|
|
|
def _load_collaboration_config(self) -> Dict[str, Any]: |
|
"""Load collaboration configuration""" |
|
|
|
config_path = Path("configs/collaboration.yaml") |
|
|
|
if config_path.exists(): |
|
with open(config_path, 'r') as f: |
|
return yaml.safe_load(f) |
|
else: |
|
|
|
default_config = { |
|
'default_sensitivity_level': SensitivityLevel.CONSORTIUM.value, |
|
'auto_validation_enabled': True, |
|
'data_retention_days': 365, |
|
'privacy_method': 'k_anonymity', |
|
'min_validation_score': 0.8, |
|
'collaboration_timeout_hours': 72 |
|
} |
|
|
|
|
|
config_path.parent.mkdir(exist_ok=True) |
|
with open(config_path, 'w') as f: |
|
yaml.dump(default_config, f) |
|
|
|
return default_config |
|
|
|
async def create_collaboration_project(self, |
|
name: str, |
|
description: str, |
|
collaboration_type: CollaborationType, |
|
coordinator_id: str, |
|
participants: List[str], |
|
sensitivity_level: SensitivityLevel = SensitivityLevel.CONSORTIUM, |
|
deadline: Optional[datetime] = None) -> str: |
|
"""Create new collaboration project""" |
|
|
|
project_id = f"proj_{uuid.uuid4().hex[:8]}" |
|
|
|
project = CollaborationProject( |
|
project_id=project_id, |
|
name=name, |
|
description=description, |
|
collaboration_type=collaboration_type, |
|
coordinator=coordinator_id, |
|
participants=participants, |
|
created_at=datetime.now(), |
|
deadline=deadline, |
|
sensitivity_level=sensitivity_level, |
|
data_sharing_rules={ |
|
'anonymization_required': sensitivity_level != SensitivityLevel.PUBLIC, |
|
'validation_required': True, |
|
'retention_days': self.collaboration_config.get('data_retention_days', 365) |
|
}, |
|
validation_requirements={ |
|
'min_validators': 2, |
|
'min_score': self.collaboration_config.get('min_validation_score', 0.8) |
|
} |
|
) |
|
|
|
self.repository.save_project(project) |
|
|
|
logger.info(f"Created collaboration project: {project_id} - {name}") |
|
return project_id |
|
|
|
async def contribute_insight(self, |
|
project_id: str, |
|
title: str, |
|
description: str, |
|
findings: Dict[str, Any], |
|
evidence: List[Dict[str, Any]], |
|
methodology: Dict[str, Any], |
|
contributor_id: str) -> str: |
|
"""Contribute research insight to project""" |
|
|
|
project = self.repository.get_project(project_id) |
|
if not project: |
|
raise ValueError(f"Project not found: {project_id}") |
|
|
|
contributor = self.repository.get_participant(contributor_id) |
|
if not contributor: |
|
raise ValueError(f"Contributor not found: {contributor_id}") |
|
|
|
insight_id = f"insight_{uuid.uuid4().hex[:8]}" |
|
|
|
insight = ResearchInsight( |
|
insight_id=insight_id, |
|
title=title, |
|
description=description, |
|
collaboration_type=project.collaboration_type, |
|
sensitivity_level=project.sensitivity_level, |
|
findings=findings, |
|
evidence=evidence, |
|
methodology=methodology, |
|
contributor_org=contributor.organization, |
|
contributors=[contributor.name], |
|
created_at=datetime.now(), |
|
updated_at=datetime.now(), |
|
version="1.0", |
|
data_retention_days=project.data_sharing_rules.get('retention_days') |
|
) |
|
|
|
|
|
if project.data_sharing_rules.get('anonymization_required', False): |
|
privacy_method = self.collaboration_config.get('privacy_method', 'k_anonymity') |
|
insight = self.privacy_analytics.anonymize_insight(insight, privacy_method) |
|
|
|
|
|
privacy_risk = self.privacy_analytics.compute_privacy_risk_score(insight) |
|
if privacy_risk > 0.7: |
|
logger.warning(f"High privacy risk detected for insight: {insight_id} (risk: {privacy_risk:.2f})") |
|
|
|
self.repository.save_insight(insight) |
|
|
|
|
|
project.insights.append(insight_id) |
|
self.repository.save_project(project) |
|
|
|
|
|
contributor.contributions_count += 1 |
|
contributor.last_active = datetime.now() |
|
self.repository.save_participant(contributor) |
|
|
|
logger.info(f"Contributed insight: {insight_id} to project: {project_id}") |
|
return insight_id |
|
|
|
async def validate_insight(self, |
|
insight_id: str, |
|
validator_id: str, |
|
validation_score: float, |
|
feedback: str) -> bool: |
|
"""Validate research insight""" |
|
|
|
insight = self.repository.get_insight(insight_id) |
|
if not insight: |
|
raise ValueError(f"Insight not found: {insight_id}") |
|
|
|
validator = self.repository.get_participant(validator_id) |
|
if not validator: |
|
raise ValueError(f"Validator not found: {validator_id}") |
|
|
|
|
|
validation_feedback = { |
|
'validator_id': validator_id, |
|
'validator_name': validator.name, |
|
'score': validation_score, |
|
'feedback': feedback, |
|
'timestamp': datetime.now().isoformat() |
|
} |
|
|
|
insight.validation_feedback.append(validation_feedback) |
|
insight.validators.append(validator_id) |
|
|
|
|
|
if len(insight.validators) >= 2: |
|
avg_score = sum(vf['score'] for vf in insight.validation_feedback) / len(insight.validation_feedback) |
|
min_score = self.collaboration_config.get('min_validation_score', 0.8) |
|
|
|
if avg_score >= min_score: |
|
insight.validation_status = "validated" |
|
logger.info(f"Insight {insight_id} validated with score: {avg_score:.2f}") |
|
else: |
|
insight.validation_status = "disputed" |
|
logger.warning(f"Insight {insight_id} disputed with score: {avg_score:.2f}") |
|
|
|
self.repository.save_insight(insight) |
|
|
|
|
|
validator.validations_count += 1 |
|
validator.last_active = datetime.now() |
|
self.repository.save_participant(validator) |
|
|
|
return insight.validation_status == "validated" |
|
|
|
async def share_insight_securely(self, |
|
insight_id: str, |
|
recipient_ids: List[str]) -> Dict[str, str]: |
|
"""Share insight securely with specific recipients""" |
|
|
|
insight = self.repository.get_insight(insight_id) |
|
if not insight: |
|
raise ValueError(f"Insight not found: {insight_id}") |
|
|
|
shared_data = {} |
|
|
|
for recipient_id in recipient_ids: |
|
recipient = self.repository.get_participant(recipient_id) |
|
if not recipient: |
|
logger.warning(f"Recipient not found: {recipient_id}") |
|
continue |
|
|
|
try: |
|
|
|
encrypted_payload = self.security_protocol.encrypt_data( |
|
insight.to_dict(), |
|
recipient.public_key |
|
) |
|
|
|
shared_data[recipient_id] = encrypted_payload |
|
|
|
logger.info(f"Encrypted insight {insight_id} for recipient: {recipient_id}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to encrypt for {recipient_id}: {str(e)}") |
|
|
|
return shared_data |
|
|
|
def generate_collaboration_report(self, project_id: str) -> Dict[str, Any]: |
|
"""Generate comprehensive collaboration report""" |
|
|
|
project = self.repository.get_project(project_id) |
|
if not project: |
|
raise ValueError(f"Project not found: {project_id}") |
|
|
|
|
|
project_insights = [] |
|
for insight_id in project.insights: |
|
insight = self.repository.get_insight(insight_id) |
|
if insight: |
|
project_insights.append(insight) |
|
|
|
|
|
total_insights = len(project_insights) |
|
validated_insights = len([i for i in project_insights if i.validation_status == "validated"]) |
|
disputed_insights = len([i for i in project_insights if i.validation_status == "disputed"]) |
|
pending_insights = total_insights - validated_insights - disputed_insights |
|
|
|
|
|
participant_contributions = {} |
|
for insight in project_insights: |
|
org = insight.contributor_org |
|
participant_contributions[org] = participant_contributions.get(org, 0) + 1 |
|
|
|
|
|
validation_scores = [] |
|
for insight in project_insights: |
|
if insight.validation_feedback: |
|
avg_score = sum(vf['score'] for vf in insight.validation_feedback) / len(insight.validation_feedback) |
|
validation_scores.append(avg_score) |
|
|
|
avg_validation_score = sum(validation_scores) / len(validation_scores) if validation_scores else 0.0 |
|
|
|
return { |
|
'project_info': { |
|
'project_id': project.project_id, |
|
'name': project.name, |
|
'collaboration_type': project.collaboration_type.value, |
|
'status': project.status, |
|
'created_at': project.created_at.isoformat(), |
|
'participants_count': len(project.participants) |
|
}, |
|
'insight_statistics': { |
|
'total_insights': total_insights, |
|
'validated_insights': validated_insights, |
|
'disputed_insights': disputed_insights, |
|
'pending_insights': pending_insights, |
|
'validation_rate': validated_insights / total_insights if total_insights > 0 else 0.0 |
|
}, |
|
'validation_metrics': { |
|
'average_validation_score': avg_validation_score, |
|
'total_validations': sum(len(i.validators) for i in project_insights), |
|
'unique_validators': len(set(v for i in project_insights for v in i.validators)) |
|
}, |
|
'participant_contributions': participant_contributions, |
|
'collaboration_effectiveness': { |
|
'insights_per_participant': total_insights / len(project.participants) if project.participants else 0.0, |
|
'validation_coverage': len([i for i in project_insights if i.validators]) / total_insights if total_insights > 0 else 0.0 |
|
} |
|
} |
|
|
|
def get_collaboration_statistics(self) -> Dict[str, Any]: |
|
"""Get overall collaboration statistics""" |
|
|
|
total_participants = len(self.repository.participants) |
|
total_projects = len(self.repository.projects) |
|
total_insights = len(self.repository.insights) |
|
|
|
|
|
active_projects = len([p for p in self.repository.projects.values() if p.status == "active"]) |
|
|
|
|
|
thirty_days_ago = datetime.now() - timedelta(days=30) |
|
recent_insights = len([ |
|
i for i in self.repository.insights.values() |
|
if i.created_at >= thirty_days_ago |
|
]) |
|
|
|
|
|
collab_type_dist = {} |
|
for project in self.repository.projects.values(): |
|
ct = project.collaboration_type.value |
|
collab_type_dist[ct] = collab_type_dist.get(ct, 0) + 1 |
|
|
|
return { |
|
'overview': { |
|
'total_participants': total_participants, |
|
'total_projects': total_projects, |
|
'total_insights': total_insights, |
|
'active_projects': active_projects |
|
}, |
|
'recent_activity': { |
|
'insights_last_30_days': recent_insights, |
|
'activity_rate': recent_insights / 30.0 |
|
}, |
|
'collaboration_distribution': collab_type_dist, |
|
'organization': self.organization_name |
|
} |
|
|
|
|
|
def create_research_collaboration_manager(organization_name: str, **kwargs) -> ResearchCollaborationManager: |
|
"""Create research collaboration manager with configuration""" |
|
return ResearchCollaborationManager(organization_name, **kwargs) |
|
|