Ahmedik95316 commited on
Commit
19795d9
·
1 Parent(s): 0d2146c

Create data_validator.py

Browse files

Adding Data Validation Schemas

Files changed (1) hide show
  1. data/data_validator.py +772 -0
data/data_validator.py ADDED
@@ -0,0 +1,772 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # File: data/data_validator.py (NEW FILE)
2
+ # Comprehensive data validation pipeline with checkpoints and monitoring
3
+
4
+ import json
5
+ import time
6
+ import logging
7
+ import pandas as pd
8
+ from pathlib import Path
9
+ from datetime import datetime, timedelta
10
+ from typing import List, Dict, Any, Tuple, Optional, Union
11
+ from pydantic import ValidationError
12
+ import hashlib
13
+ from collections import defaultdict, Counter
14
+
15
+ # Import validation schemas
16
+ from .validation_schemas import (
17
+ NewsArticleSchema, TextContentSchema, LabelSchema, DataSourceSchema,
18
+ BatchValidationSchema, ValidationResultSchema, BatchValidationResultSchema,
19
+ ValidationLevel, TextQualityLevel, DataSource, NewsLabel
20
+ )
21
+
22
+ # Configure logging
23
+ logging.basicConfig(level=logging.INFO)
24
+ logger = logging.getLogger(__name__)
25
+
26
+
27
+ class ValidationCheckpoint:
28
+ """Individual validation checkpoint for pipeline monitoring"""
29
+
30
+ def __init__(self, name: str, description: str, validation_level: ValidationLevel = ValidationLevel.MODERATE):
31
+ self.name = name
32
+ self.description = description
33
+ self.validation_level = validation_level
34
+ self.start_time = None
35
+ self.end_time = None
36
+ self.results = []
37
+ self.errors = []
38
+ self.warnings = []
39
+
40
+ def start(self):
41
+ """Start checkpoint timing"""
42
+ self.start_time = time.time()
43
+ logger.info(f"Starting validation checkpoint: {self.name}")
44
+
45
+ def end(self):
46
+ """End checkpoint timing"""
47
+ self.end_time = time.time()
48
+ duration = self.processing_time
49
+ logger.info(f"Completed validation checkpoint: {self.name} ({duration:.2f}s)")
50
+
51
+ def add_result(self, result: ValidationResultSchema):
52
+ """Add validation result"""
53
+ self.results.append(result)
54
+
55
+ def add_error(self, error: str):
56
+ """Add validation error"""
57
+ self.errors.append(error)
58
+ logger.error(f"Checkpoint {self.name}: {error}")
59
+
60
+ def add_warning(self, warning: str):
61
+ """Add validation warning"""
62
+ self.warnings.append(warning)
63
+ logger.warning(f"Checkpoint {self.name}: {warning}")
64
+
65
+ @property
66
+ def processing_time(self) -> float:
67
+ """Calculate processing time"""
68
+ if self.start_time and self.end_time:
69
+ return self.end_time - self.start_time
70
+ return 0.0
71
+
72
+ @property
73
+ def success_rate(self) -> float:
74
+ """Calculate success rate"""
75
+ if not self.results:
76
+ return 0.0
77
+ valid_count = sum(1 for result in self.results if result.is_valid)
78
+ return valid_count / len(self.results)
79
+
80
+ def to_dict(self) -> Dict[str, Any]:
81
+ """Convert checkpoint to dictionary"""
82
+ return {
83
+ 'name': self.name,
84
+ 'description': self.description,
85
+ 'validation_level': self.validation_level.value,
86
+ 'processing_time': self.processing_time,
87
+ 'total_validations': len(self.results),
88
+ 'success_rate': self.success_rate,
89
+ 'error_count': len(self.errors),
90
+ 'warning_count': len(self.warnings),
91
+ 'errors': self.errors,
92
+ 'warnings': self.warnings
93
+ }
94
+
95
+
96
+ class DataValidationPipeline:
97
+ """Comprehensive data validation pipeline with checkpoints and monitoring"""
98
+
99
+ def __init__(self, base_path: Optional[Path] = None):
100
+ self.base_path = base_path or Path("/tmp")
101
+ self.setup_paths()
102
+ self.checkpoints = {}
103
+ self.validation_history = []
104
+ self.quality_stats = defaultdict(int)
105
+
106
+ def setup_paths(self):
107
+ """Setup validation paths"""
108
+ self.logs_dir = self.base_path / "logs"
109
+ self.validation_dir = self.base_path / "validation"
110
+ self.cache_dir = self.base_path / "cache"
111
+
112
+ # Create directories
113
+ for path in [self.logs_dir, self.validation_dir, self.cache_dir]:
114
+ path.mkdir(parents=True, exist_ok=True)
115
+
116
+ # Setup file paths
117
+ self.validation_log_path = self.logs_dir / "validation_log.json"
118
+ self.validation_stats_path = self.validation_dir / "validation_statistics.json"
119
+ self.failed_validations_path = self.validation_dir / "failed_validations.json"
120
+ self.quality_report_path = self.validation_dir / "quality_report.json"
121
+
122
+ def create_checkpoint(self, name: str, description: str,
123
+ validation_level: ValidationLevel = ValidationLevel.MODERATE) -> ValidationCheckpoint:
124
+ """Create a new validation checkpoint"""
125
+ checkpoint = ValidationCheckpoint(name, description, validation_level)
126
+ self.checkpoints[name] = checkpoint
127
+ return checkpoint
128
+
129
+ def validate_single_article(self, text: str, label: int, source: str,
130
+ validation_level: ValidationLevel = ValidationLevel.MODERATE,
131
+ **metadata) -> ValidationResultSchema:
132
+ """Validate a single article with comprehensive checks"""
133
+
134
+ start_time = time.time()
135
+ errors = []
136
+ warnings = []
137
+ quality_metrics = {}
138
+
139
+ try:
140
+ # Create text content schema
141
+ text_content = TextContentSchema(text=text)
142
+ quality_metrics['word_count'] = text_content.word_count
143
+ quality_metrics['character_count'] = text_content.character_count
144
+ quality_metrics['sentence_count'] = text_content.sentence_count
145
+
146
+ except ValidationError as e:
147
+ for error in e.errors():
148
+ errors.append(f"Text validation: {error['msg']}")
149
+
150
+ try:
151
+ # Create label schema
152
+ label_info = LabelSchema(label=label)
153
+
154
+ except ValidationError as e:
155
+ for error in e.errors():
156
+ errors.append(f"Label validation: {error['msg']}")
157
+
158
+ try:
159
+ # Create source schema
160
+ source_info = DataSourceSchema(
161
+ source=DataSource(source),
162
+ timestamp=datetime.now(),
163
+ **{k: v for k, v in metadata.items() if k in ['url', 'batch_id']}
164
+ )
165
+
166
+ except ValidationError as e:
167
+ for error in e.errors():
168
+ errors.append(f"Source validation: {error['msg']}")
169
+
170
+ # Additional quality checks based on validation level
171
+ if validation_level in [ValidationLevel.MODERATE, ValidationLevel.STRICT]:
172
+
173
+ # Language detection (simplified)
174
+ if text:
175
+ english_words = {'the', 'and', 'is', 'in', 'to', 'of', 'a', 'that', 'it', 'with', 'for', 'as', 'was', 'on', 'are', 'you'}
176
+ words = set(text.lower().split())
177
+ english_ratio = len(words & english_words) / len(words) if words else 0
178
+
179
+ if english_ratio < 0.1:
180
+ warnings.append("Text may not be in English")
181
+
182
+ quality_metrics['english_ratio'] = english_ratio
183
+
184
+ # Content coherence check
185
+ if text and len(text.split()) > 10:
186
+ sentences = [s.strip() for s in text.split('.') if s.strip()]
187
+ if len(sentences) > 1:
188
+ avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)
189
+ quality_metrics['avg_sentence_length'] = avg_sentence_length
190
+
191
+ if avg_sentence_length < 3:
192
+ warnings.append("Very short average sentence length")
193
+ elif avg_sentence_length > 50:
194
+ warnings.append("Very long average sentence length")
195
+
196
+ if validation_level == ValidationLevel.STRICT:
197
+
198
+ # Advanced quality checks
199
+ if text:
200
+ # Check for AI-generated patterns (simplified)
201
+ ai_indicators = ['as an ai', 'i am an artificial', 'generated by', 'chatgpt', 'gpt-3', 'gpt-4']
202
+ if any(indicator in text.lower() for indicator in ai_indicators):
203
+ warnings.append("Text may be AI-generated")
204
+
205
+ # Check for template patterns
206
+ template_patterns = [r'\{[^}]+\}', r'\[[^\]]+\]', r'<[^>]+>']
207
+ import re
208
+ for pattern in template_patterns:
209
+ if re.search(pattern, text):
210
+ warnings.append("Text contains template patterns")
211
+ break
212
+
213
+ # Check readability (simplified Flesch reading ease)
214
+ words = text.split()
215
+ sentences = len([s for s in text.split('.') if s.strip()])
216
+ syllables = sum(max(1, len([c for c in word if c.lower() in 'aeiouy'])) for word in words)
217
+
218
+ if sentences > 0 and words:
219
+ avg_sentence_length = len(words) / sentences
220
+ avg_syllables = syllables / len(words)
221
+
222
+ # Simplified Flesch score
223
+ flesch_score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * avg_syllables)
224
+ quality_metrics['flesch_score'] = flesch_score
225
+
226
+ if flesch_score < 30:
227
+ warnings.append("Text is very difficult to read")
228
+ elif flesch_score > 90:
229
+ warnings.append("Text is very easy to read (may be simplistic)")
230
+
231
+ # Calculate overall quality score
232
+ quality_score = self._calculate_quality_score(quality_metrics, errors, warnings)
233
+ quality_metrics['overall_quality_score'] = quality_score
234
+
235
+ # Determine if validation passed
236
+ is_valid = len(errors) == 0
237
+ processing_time = time.time() - start_time
238
+
239
+ return ValidationResultSchema(
240
+ is_valid=is_valid,
241
+ errors=errors,
242
+ warnings=warnings,
243
+ quality_metrics=quality_metrics,
244
+ validation_level=validation_level,
245
+ processing_time=processing_time
246
+ )
247
+
248
+ def validate_batch(self, articles_data: List[Dict[str, Any]],
249
+ batch_id: Optional[str] = None,
250
+ validation_level: ValidationLevel = ValidationLevel.MODERATE) -> BatchValidationResultSchema:
251
+ """Validate a batch of articles with comprehensive reporting"""
252
+
253
+ if not batch_id:
254
+ batch_id = f"batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{hashlib.md5(str(articles_data).encode()).hexdigest()[:8]}"
255
+
256
+ logger.info(f"Starting batch validation: {batch_id} ({len(articles_data)} articles)")
257
+
258
+ # Create validation checkpoint
259
+ checkpoint = self.create_checkpoint(
260
+ f"batch_validation_{batch_id}",
261
+ f"Batch validation for {len(articles_data)} articles",
262
+ validation_level
263
+ )
264
+ checkpoint.start()
265
+
266
+ validation_results = []
267
+ valid_count = 0
268
+ invalid_count = 0
269
+ quality_distribution = Counter()
270
+ source_distribution = Counter()
271
+
272
+ # Validate each article
273
+ for i, article_data in enumerate(articles_data):
274
+ try:
275
+ text = article_data.get('text', '')
276
+ label = article_data.get('label', 0)
277
+ source = article_data.get('source', 'unknown')
278
+
279
+ # Extract metadata
280
+ metadata = {k: v for k, v in article_data.items()
281
+ if k not in ['text', 'label', 'source']}
282
+
283
+ # Validate article
284
+ result = self.validate_single_article(
285
+ text, label, source, validation_level, **metadata
286
+ )
287
+
288
+ validation_results.append(result)
289
+ checkpoint.add_result(result)
290
+
291
+ if result.is_valid:
292
+ valid_count += 1
293
+ else:
294
+ invalid_count += 1
295
+
296
+ # Update distributions
297
+ quality_score = result.quality_metrics.get('overall_quality_score', 0)
298
+ if quality_score >= 0.8:
299
+ quality_level = 'high'
300
+ elif quality_score >= 0.6:
301
+ quality_level = 'medium'
302
+ elif quality_score >= 0.4:
303
+ quality_level = 'low'
304
+ else:
305
+ quality_level = 'invalid'
306
+
307
+ quality_distribution[quality_level] += 1
308
+ source_distribution[source] += 1
309
+
310
+ except Exception as e:
311
+ error_msg = f"Failed to validate article {i}: {str(e)}"
312
+ checkpoint.add_error(error_msg)
313
+ invalid_count += 1
314
+
315
+ checkpoint.end()
316
+
317
+ # Calculate overall quality score
318
+ if validation_results:
319
+ quality_scores = [r.quality_metrics.get('overall_quality_score', 0) for r in validation_results]
320
+ overall_quality_score = sum(quality_scores) / len(quality_scores)
321
+ else:
322
+ overall_quality_score = 0.0
323
+
324
+ # Create validation summary
325
+ validation_summary = {
326
+ 'batch_id': batch_id,
327
+ 'total_articles': len(articles_data),
328
+ 'validation_level': validation_level.value,
329
+ 'processing_time': checkpoint.processing_time,
330
+ 'success_rate': checkpoint.success_rate,
331
+ 'error_count': len(checkpoint.errors),
332
+ 'warning_count': len(checkpoint.warnings),
333
+ 'quality_metrics': {
334
+ 'average_quality_score': overall_quality_score,
335
+ 'quality_distribution': dict(quality_distribution),
336
+ 'source_distribution': dict(source_distribution)
337
+ }
338
+ }
339
+
340
+ # Create batch validation result
341
+ batch_result = BatchValidationResultSchema(
342
+ batch_id=batch_id,
343
+ total_articles=len(articles_data),
344
+ valid_articles=valid_count,
345
+ invalid_articles=invalid_count,
346
+ validation_results=validation_results,
347
+ overall_quality_score=overall_quality_score,
348
+ quality_distribution=dict(quality_distribution),
349
+ source_distribution=dict(source_distribution),
350
+ validation_summary=validation_summary
351
+ )
352
+
353
+ # Log batch validation
354
+ self._log_batch_validation(batch_result)
355
+
356
+ # Update statistics
357
+ self._update_validation_statistics(batch_result)
358
+
359
+ logger.info(f"Batch validation completed: {batch_id} "
360
+ f"({valid_count}/{len(articles_data)} valid, "
361
+ f"quality: {overall_quality_score:.3f})")
362
+
363
+ return batch_result
364
+
365
+ def validate_dataframe(self, df: pd.DataFrame,
366
+ validation_level: ValidationLevel = ValidationLevel.MODERATE,
367
+ batch_id: Optional[str] = None) -> BatchValidationResultSchema:
368
+ """Validate a pandas DataFrame"""
369
+
370
+ # Convert DataFrame to list of dictionaries
371
+ articles_data = df.to_dict('records')
372
+
373
+ return self.validate_batch(articles_data, batch_id, validation_level)
374
+
375
+ def validate_csv_file(self, file_path: Path,
376
+ validation_level: ValidationLevel = ValidationLevel.MODERATE,
377
+ batch_id: Optional[str] = None) -> BatchValidationResultSchema:
378
+ """Validate articles from a CSV file"""
379
+
380
+ try:
381
+ df = pd.read_csv(file_path)
382
+ if batch_id is None:
383
+ batch_id = f"csv_{file_path.stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
384
+
385
+ return self.validate_dataframe(df, validation_level, batch_id)
386
+
387
+ except Exception as e:
388
+ logger.error(f"Failed to validate CSV file {file_path}: {e}")
389
+ raise
390
+
391
+ def validate_scraped_data(self, scraped_data: List[Dict[str, Any]],
392
+ source_name: str = "scraped_data") -> BatchValidationResultSchema:
393
+ """Validate scraped data with specific checks for web content"""
394
+
395
+ # Create checkpoint for scraped data validation
396
+ checkpoint = self.create_checkpoint(
397
+ f"scraped_validation_{source_name}",
398
+ f"Validation for scraped data from {source_name}",
399
+ ValidationLevel.MODERATE
400
+ )
401
+ checkpoint.start()
402
+
403
+ # Add scraped-specific validation logic
404
+ enhanced_data = []
405
+ for item in scraped_data:
406
+ # Ensure required fields
407
+ if 'source' not in item:
408
+ item['source'] = 'scraped_real'
409
+ if 'label' not in item:
410
+ item['label'] = 0 # Default to real for scraped news
411
+
412
+ enhanced_data.append(item)
413
+
414
+ result = self.validate_batch(
415
+ enhanced_data,
416
+ f"scraped_{source_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
417
+ ValidationLevel.MODERATE
418
+ )
419
+
420
+ checkpoint.end()
421
+
422
+ # Additional scraped data quality checks
423
+ if result.overall_quality_score < 0.6:
424
+ checkpoint.add_warning(f"Low quality scraped data: {result.overall_quality_score:.3f}")
425
+
426
+ # Check for suspicious patterns in scraped data
427
+ suspicious_count = 0
428
+ for validation_result in result.validation_results:
429
+ if any('suspicious' in warning.lower() for warning in validation_result.warnings):
430
+ suspicious_count += 1
431
+
432
+ if suspicious_count > len(scraped_data) * 0.1: # More than 10% suspicious
433
+ checkpoint.add_warning(f"High number of suspicious articles: {suspicious_count}/{len(scraped_data)}")
434
+
435
+ return result
436
+
437
+ def _calculate_quality_score(self, quality_metrics: Dict[str, Any],
438
+ errors: List[str], warnings: List[str]) -> float:
439
+ """Calculate overall quality score based on metrics and issues"""
440
+
441
+ base_score = 1.0
442
+
443
+ # Penalize for errors and warnings
444
+ base_score -= len(errors) * 0.2
445
+ base_score -= len(warnings) * 0.05
446
+
447
+ # Adjust based on content metrics
448
+ word_count = quality_metrics.get('word_count', 0)
449
+ if word_count < 20:
450
+ base_score -= 0.3
451
+ elif word_count < 50:
452
+ base_score -= 0.1
453
+ elif word_count > 1000:
454
+ base_score += 0.1
455
+
456
+ # Adjust based on readability
457
+ flesch_score = quality_metrics.get('flesch_score')
458
+ if flesch_score:
459
+ if 30 <= flesch_score <= 70: # Good readability range
460
+ base_score += 0.1
461
+ elif flesch_score < 10 or flesch_score > 90: # Poor readability
462
+ base_score -= 0.15
463
+
464
+ # Adjust based on English content ratio
465
+ english_ratio = quality_metrics.get('english_ratio')
466
+ if english_ratio:
467
+ if english_ratio >= 0.3:
468
+ base_score += 0.05
469
+ else:
470
+ base_score -= 0.1
471
+
472
+ return max(0.0, min(1.0, base_score))
473
+
474
+ def _log_batch_validation(self, batch_result: BatchValidationResultSchema):
475
+ """Log batch validation results"""
476
+ try:
477
+ log_entry = {
478
+ 'timestamp': datetime.now().isoformat(),
479
+ 'batch_id': batch_result.batch_id,
480
+ 'total_articles': batch_result.total_articles,
481
+ 'valid_articles': batch_result.valid_articles,
482
+ 'success_rate': batch_result.success_rate,
483
+ 'overall_quality_score': batch_result.overall_quality_score,
484
+ 'validation_summary': batch_result.validation_summary
485
+ }
486
+
487
+ # Load existing logs
488
+ logs = []
489
+ if self.validation_log_path.exists():
490
+ try:
491
+ with open(self.validation_log_path, 'r') as f:
492
+ logs = json.load(f)
493
+ except:
494
+ logs = []
495
+
496
+ logs.append(log_entry)
497
+
498
+ # Keep only last 1000 entries
499
+ if len(logs) > 1000:
500
+ logs = logs[-1000:]
501
+
502
+ # Save logs
503
+ with open(self.validation_log_path, 'w') as f:
504
+ json.dump(logs, f, indent=2)
505
+
506
+ except Exception as e:
507
+ logger.error(f"Failed to log batch validation: {e}")
508
+
509
+ def _update_validation_statistics(self, batch_result: BatchValidationResultSchema):
510
+ """Update validation statistics"""
511
+ try:
512
+ # Load existing stats
513
+ stats = {}
514
+ if self.validation_stats_path.exists():
515
+ try:
516
+ with open(self.validation_stats_path, 'r') as f:
517
+ stats = json.load(f)
518
+ except:
519
+ stats = {}
520
+
521
+ # Initialize stats if empty
522
+ if not stats:
523
+ stats = {
524
+ 'total_validations': 0,
525
+ 'total_articles': 0,
526
+ 'total_valid_articles': 0,
527
+ 'average_quality_score': 0.0,
528
+ 'validation_history': [],
529
+ 'quality_trends': [],
530
+ 'source_statistics': {},
531
+ 'last_updated': None
532
+ }
533
+
534
+ # Update statistics
535
+ stats['total_validations'] += 1
536
+ stats['total_articles'] += batch_result.total_articles
537
+ stats['total_valid_articles'] += batch_result.valid_articles
538
+
539
+ # Update average quality score
540
+ current_avg = stats['average_quality_score']
541
+ total_validations = stats['total_validations']
542
+ stats['average_quality_score'] = (
543
+ (current_avg * (total_validations - 1) + batch_result.overall_quality_score) /
544
+ total_validations
545
+ )
546
+
547
+ # Add to history
548
+ history_entry = {
549
+ 'timestamp': datetime.now().isoformat(),
550
+ 'batch_id': batch_result.batch_id,
551
+ 'quality_score': batch_result.overall_quality_score,
552
+ 'success_rate': batch_result.success_rate,
553
+ 'article_count': batch_result.total_articles
554
+ }
555
+
556
+ stats['validation_history'].append(history_entry)
557
+ stats['quality_trends'].append({
558
+ 'timestamp': datetime.now().isoformat(),
559
+ 'quality_score': batch_result.overall_quality_score
560
+ })
561
+
562
+ # Keep only last 100 history entries
563
+ if len(stats['validation_history']) > 100:
564
+ stats['validation_history'] = stats['validation_history'][-100:]
565
+ if len(stats['quality_trends']) > 100:
566
+ stats['quality_trends'] = stats['quality_trends'][-100:]
567
+
568
+ # Update source statistics
569
+ for source, count in batch_result.source_distribution.items():
570
+ if source not in stats['source_statistics']:
571
+ stats['source_statistics'][source] = {'total_articles': 0, 'total_validations': 0}
572
+
573
+ stats['source_statistics'][source]['total_articles'] += count
574
+ stats['source_statistics'][source]['total_validations'] += 1
575
+
576
+ stats['last_updated'] = datetime.now().isoformat()
577
+
578
+ # Save updated stats
579
+ with open(self.validation_stats_path, 'w') as f:
580
+ json.dump(stats, f, indent=2)
581
+
582
+ except Exception as e:
583
+ logger.error(f"Failed to update validation statistics: {e}")
584
+
585
+ def get_validation_statistics(self) -> Dict[str, Any]:
586
+ """Get current validation statistics"""
587
+ try:
588
+ if self.validation_stats_path.exists():
589
+ with open(self.validation_stats_path, 'r') as f:
590
+ return json.load(f)
591
+ return {}
592
+ except Exception as e:
593
+ logger.error(f"Failed to load validation statistics: {e}")
594
+ return {}
595
+
596
+ def get_validation_history(self, limit: int = 50) -> List[Dict[str, Any]]:
597
+ """Get validation history"""
598
+ try:
599
+ if self.validation_log_path.exists():
600
+ with open(self.validation_log_path, 'r') as f:
601
+ logs = json.load(f)
602
+ return logs[-limit:] if limit else logs
603
+ return []
604
+ except Exception as e:
605
+ logger.error(f"Failed to load validation history: {e}")
606
+ return []
607
+
608
+ def generate_quality_report(self) -> Dict[str, Any]:
609
+ """Generate comprehensive quality report"""
610
+ try:
611
+ stats = self.get_validation_statistics()
612
+
613
+ if not stats:
614
+ return {'error': 'No validation statistics available'}
615
+
616
+ # Calculate trends
617
+ quality_trends = stats.get('quality_trends', [])
618
+ if len(quality_trends) >= 2:
619
+ recent_scores = [t['quality_score'] for t in quality_trends[-10:]]
620
+ older_scores = [t['quality_score'] for t in quality_trends[-20:-10]] if len(quality_trends) >= 20 else []
621
+
622
+ recent_avg = sum(recent_scores) / len(recent_scores)
623
+ older_avg = sum(older_scores) / len(older_scores) if older_scores else recent_avg
624
+
625
+ quality_trend = recent_avg - older_avg
626
+ else:
627
+ quality_trend = 0.0
628
+
629
+ # Generate report
630
+ report = {
631
+ 'report_timestamp': datetime.now().isoformat(),
632
+ 'overall_statistics': {
633
+ 'total_validations': stats.get('total_validations', 0),
634
+ 'total_articles': stats.get('total_articles', 0),
635
+ 'overall_success_rate': (stats.get('total_valid_articles', 0) /
636
+ max(stats.get('total_articles', 1), 1)),
637
+ 'average_quality_score': stats.get('average_quality_score', 0.0),
638
+ 'quality_trend': quality_trend
639
+ },
640
+ 'source_breakdown': stats.get('source_statistics', {}),
641
+ 'recent_performance': {
642
+ 'last_10_validations': quality_trends[-10:] if quality_trends else [],
643
+ 'recent_average_quality': (sum(t['quality_score'] for t in quality_trends[-10:]) /
644
+ len(quality_trends[-10:])) if quality_trends else 0.0
645
+ },
646
+ 'quality_assessment': self._assess_overall_quality(stats),
647
+ 'recommendations': self._generate_recommendations(stats)
648
+ }
649
+
650
+ # Save report
651
+ with open(self.quality_report_path, 'w') as f:
652
+ json.dump(report, f, indent=2)
653
+
654
+ return report
655
+
656
+ except Exception as e:
657
+ logger.error(f"Failed to generate quality report: {e}")
658
+ return {'error': str(e)}
659
+
660
+ def _assess_overall_quality(self, stats: Dict[str, Any]) -> Dict[str, Any]:
661
+ """Assess overall data quality"""
662
+ avg_quality = stats.get('average_quality_score', 0.0)
663
+ success_rate = stats.get('total_valid_articles', 0) / max(stats.get('total_articles', 1), 1)
664
+
665
+ if avg_quality >= 0.8 and success_rate >= 0.9:
666
+ quality_level = 'excellent'
667
+ quality_color = 'green'
668
+ elif avg_quality >= 0.6 and success_rate >= 0.8:
669
+ quality_level = 'good'
670
+ quality_color = 'blue'
671
+ elif avg_quality >= 0.4 and success_rate >= 0.6:
672
+ quality_level = 'fair'
673
+ quality_color = 'yellow'
674
+ else:
675
+ quality_level = 'poor'
676
+ quality_color = 'red'
677
+
678
+ return {
679
+ 'quality_level': quality_level,
680
+ 'quality_color': quality_color,
681
+ 'average_score': avg_quality,
682
+ 'success_rate': success_rate,
683
+ 'assessment': f"Data quality is {quality_level} with {success_rate:.1%} validation success rate"
684
+ }
685
+
686
+ def _generate_recommendations(self, stats: Dict[str, Any]) -> List[str]:
687
+ """Generate quality improvement recommendations"""
688
+ recommendations = []
689
+
690
+ avg_quality = stats.get('average_quality_score', 0.0)
691
+ success_rate = stats.get('total_valid_articles', 0) / max(stats.get('total_articles', 1), 1)
692
+
693
+ if avg_quality < 0.6:
694
+ recommendations.append("Improve data source quality - consider additional content filters")
695
+
696
+ if success_rate < 0.8:
697
+ recommendations.append("Review validation criteria - high failure rate detected")
698
+
699
+ source_stats = stats.get('source_statistics', {})
700
+ if source_stats:
701
+ # Find problematic sources
702
+ for source, source_info in source_stats.items():
703
+ if source_info.get('total_articles', 0) > 10: # Only check sources with enough data
704
+ # This is simplified - in practice you'd track success rates per source
705
+ pass
706
+
707
+ if len(recommendations) == 0:
708
+ recommendations.append("Data quality is satisfactory - continue current practices")
709
+
710
+ return recommendations
711
+
712
+ def cleanup_old_logs(self, days_to_keep: int = 30):
713
+ """Clean up old validation logs"""
714
+ try:
715
+ cutoff_date = datetime.now() - timedelta(days=days_to_keep)
716
+
717
+ # Clean validation logs
718
+ if self.validation_log_path.exists():
719
+ with open(self.validation_log_path, 'r') as f:
720
+ logs = json.load(f)
721
+
722
+ filtered_logs = []
723
+ for log in logs:
724
+ try:
725
+ log_date = datetime.fromisoformat(log['timestamp'])
726
+ if log_date > cutoff_date:
727
+ filtered_logs.append(log)
728
+ except:
729
+ # Keep logs with invalid timestamps
730
+ filtered_logs.append(log)
731
+
732
+ with open(self.validation_log_path, 'w') as f:
733
+ json.dump(filtered_logs, f, indent=2)
734
+
735
+ logger.info(f"Cleaned up validation logs: kept {len(filtered_logs)}/{len(logs)} entries")
736
+
737
+ except Exception as e:
738
+ logger.error(f"Failed to cleanup old logs: {e}")
739
+
740
+
741
+ # Convenience functions for external use
742
+ def validate_text(text: str, label: int, source: str = "user_input",
743
+ validation_level: ValidationLevel = ValidationLevel.MODERATE) -> ValidationResultSchema:
744
+ """Validate a single text input"""
745
+ validator = DataValidationPipeline()
746
+ return validator.validate_single_article(text, label, source, validation_level)
747
+
748
+
749
+ def validate_articles_list(articles: List[Dict[str, Any]],
750
+ validation_level: ValidationLevel = ValidationLevel.MODERATE) -> BatchValidationResultSchema:
751
+ """Validate a list of articles"""
752
+ validator = DataValidationPipeline()
753
+ return validator.validate_batch(articles, validation_level=validation_level)
754
+
755
+
756
+ def validate_csv(file_path: str,
757
+ validation_level: ValidationLevel = ValidationLevel.MODERATE) -> BatchValidationResultSchema:
758
+ """Validate articles from a CSV file"""
759
+ validator = DataValidationPipeline()
760
+ return validator.validate_csv_file(Path(file_path), validation_level)
761
+
762
+
763
+ def get_validation_stats() -> Dict[str, Any]:
764
+ """Get current validation statistics"""
765
+ validator = DataValidationPipeline()
766
+ return validator.get_validation_statistics()
767
+
768
+
769
+ def generate_quality_report() -> Dict[str, Any]:
770
+ """Generate quality report"""
771
+ validator = DataValidationPipeline()
772
+ return validator.generate_quality_report()