File size: 28,446 Bytes
72f802a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
"""

Modal Labs GPU Processing

GPU-accelerated sentiment analysis and text processing

"""

import modal
import asyncio
import logging
import os
from typing import List, Dict, Any, Optional
import json
from datetime import datetime
import numpy as np
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

logger = logging.getLogger(__name__)

# Modal app definition
app = modal.App("product-feature-agent")

# Define the Modal image with all required dependencies
image = (
    modal.Image.debian_slim()
    .pip_install([
        "torch>=2.1.0",
        "transformers>=4.35.0",
        "numpy>=1.24.0",
        "scikit-learn>=1.3.0",
        "textblob>=0.17.1",
        "vaderSentiment>=3.3.2",
        "pandas>=2.1.0",
        "accelerate>=0.24.0",
        "python-dotenv>=1.0.0"
    ])
    .run_commands([
        "python -c \"import nltk; nltk.download('punkt'); nltk.download('punkt_tab'); nltk.download('vader_lexicon'); nltk.download('averaged_perceptron_tagger')\"",
        "python -m textblob.download_corpora",
        "python -c \"import textblob; textblob.TextBlob('test').tags\""
    ])
)

# Shared volume for model caching
model_volume = modal.Volume.from_name("feature-agent-models", create_if_missing=True)

@app.function(

    image=image,

    gpu="T4",  # Use T4 GPU for cost efficiency

    memory=4096,  # 4GB RAM

    timeout=300,  # 5 minute timeout

    volumes={"/models": model_volume},

    min_containers=1  # Keep one instance warm for faster response

)
def gpu_batch_sentiment_analysis(texts: List[str], batch_size: int = 32) -> List[Dict[str, Any]]:
    """

    Perform GPU-accelerated sentiment analysis on a batch of texts

    

    Args:

        texts: List of text strings to analyze

        batch_size: Batch size for processing

        

    Returns:

        List of sentiment analysis results

    """
    import torch
    from transformers.pipelines import pipeline
    from transformers import AutoTokenizer, AutoModelForSequenceClassification
    from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
    from textblob import TextBlob
    import time
    
    start_time = time.time()
    logger.info(f"Starting GPU sentiment analysis for {len(texts)} texts")
    
    try:
        # Initialize VADER analyzer (CPU-based, fast for social media text)
        vader_analyzer = SentimentIntensityAnalyzer()
        
        # Load transformer model for more sophisticated analysis
        model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
        cache_dir = "/models/sentiment"
        
        # Use GPU if available
        device = 0 if torch.cuda.is_available() else -1
        
        # Initialize sentiment pipeline with caching
        sentiment_pipeline = pipeline(
            "sentiment-analysis",
            model=model_name,
            tokenizer=model_name,
            device=device,
            model_kwargs={"cache_dir": cache_dir},
            max_length=512,
            truncation=True
        )
        
        results = []
        
        # Process texts in batches
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            batch_results = []
            
            # GPU-accelerated transformer analysis
            transformer_results = sentiment_pipeline(batch_texts)
            
            # Process each text in the batch
            for j, text in enumerate(batch_texts):
                if not text or len(text.strip()) == 0:
                    batch_results.append({
                        "text": text,
                        "error": "Empty text",
                        "processing_time": 0
                    })
                    continue
                
                text_start = time.time()
                
                # VADER sentiment (good for social media, slang, emojis)
                vader_scores = vader_analyzer.polarity_scores(text)
                
                # TextBlob sentiment (grammar-based)
                try:
                    blob = TextBlob(text)
                    textblob_sentiment = blob.sentiment
                except:
                    textblob_sentiment = None
                
                # Transformer sentiment (deep learning)
                transformer_result = transformer_results[j] # type: ignore
                
                # Map transformer labels to consistent format
                transformer_label = transformer_result["label"].lower() # type: ignore
                if transformer_label in ["positive", "pos"]:
                    transformer_sentiment = "positive"
                elif transformer_label in ["negative", "neg"]:
                    transformer_sentiment = "negative"
                else:
                    transformer_sentiment = "neutral"
                
                # Combine all sentiment scores
                combined_result = {
                    "text": text[:100] + "..." if len(text) > 100 else text,  # Truncate for storage
                    "vader": {
                        "compound": vader_scores["compound"],
                        "positive": vader_scores["pos"],
                        "negative": vader_scores["neg"],
                        "neutral": vader_scores["neu"]
                    },
                    "textblob": {
                        "polarity": textblob_sentiment.polarity if textblob_sentiment else 0, # type: ignore
                        "subjectivity": textblob_sentiment.subjectivity if textblob_sentiment else 0 # type: ignore
                    },
                    "transformer": {
                        "label": transformer_sentiment,
                        "confidence": transformer_result["score"] # type: ignore
                    },
                    "consensus": _calculate_consensus_sentiment(
                        vader_scores["compound"],
                        textblob_sentiment.polarity if textblob_sentiment else 0, # type: ignore
                        transformer_sentiment,
                        transformer_result["score"] # type: ignore
                    ),
                    "processing_time": time.time() - text_start
                }
                
                batch_results.append(combined_result)
            
            results.extend(batch_results)
            
            # Log progress
            processed = min(i + batch_size, len(texts))
            logger.info(f"Processed {processed}/{len(texts)} texts")
        
        total_time = time.time() - start_time
        logger.info(f"GPU sentiment analysis completed in {total_time:.2f}s")
        
        # Add summary statistics
        summary = _calculate_batch_summary(results)
        
        return {
            "results": results,
            "summary": summary,
            "processing_stats": {
                "total_texts": len(texts),
                "total_time": total_time,
                "texts_per_second": len(texts) / total_time,
                "gpu_used": torch.cuda.is_available(),
                "model_used": model_name
            }
        } # type: ignore
        
    except Exception as e:
        logger.error(f"Error in GPU sentiment analysis: {str(e)}")
        return {
            "error": str(e),
            "processing_stats": {
                "total_texts": len(texts),
                "total_time": time.time() - start_time,
                "gpu_used": False
            }
        } # type: ignore

def _calculate_consensus_sentiment(vader_compound: float, textblob_polarity: float, 

                                  transformer_label: str, transformer_confidence: float) -> Dict[str, Any]:
    """Calculate consensus sentiment from multiple models"""
    
    # Convert transformer to numeric
    transformer_score = 0
    if transformer_label == "positive":
        transformer_score = transformer_confidence
    elif transformer_label == "negative":
        transformer_score = -transformer_confidence
    
    # Weight the scores (transformer gets higher weight due to confidence)
    weights = {
        "vader": 0.3,
        "textblob": 0.2,
        "transformer": 0.5
    }
    
    weighted_score = (
        vader_compound * weights["vader"] +
        textblob_polarity * weights["textblob"] +
        transformer_score * weights["transformer"]
    )
    
    # Classify final sentiment
    if weighted_score >= 0.1:
        consensus_label = "positive"
    elif weighted_score <= -0.1:
        consensus_label = "negative"
    else:
        consensus_label = "neutral"
    
    # Calculate confidence based on agreement
    scores = [vader_compound, textblob_polarity, transformer_score]
    agreement = 1.0 - (np.std(scores) / 2.0)  # Higher agreement = lower std dev
    
    return {
        "label": consensus_label,
        "score": weighted_score,
        "confidence": max(0.0, min(1.0, agreement)) # type: ignore
    }

def _calculate_batch_summary(results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Calculate summary statistics for batch results"""
    if not results:
        return {}
    
    valid_results = [r for r in results if "error" not in r]
    
    if not valid_results:
        return {"error": "No valid results"}
    
    # Count sentiment labels
    sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0}
    total_confidence = 0
    total_processing_time = 0
    
    for result in valid_results:
        consensus = result.get("consensus", {})
        label = consensus.get("label", "neutral")
        confidence = consensus.get("confidence", 0)
        
        sentiment_counts[label] += 1
        total_confidence += confidence
        total_processing_time += result.get("processing_time", 0)
    
    total_valid = len(valid_results)
    
    return {
        "total_analyzed": total_valid,
        "sentiment_distribution": {
            "positive": sentiment_counts["positive"],
            "negative": sentiment_counts["negative"],
            "neutral": sentiment_counts["neutral"],
            "positive_pct": (sentiment_counts["positive"] / total_valid) * 100,
            "negative_pct": (sentiment_counts["negative"] / total_valid) * 100,
            "neutral_pct": (sentiment_counts["neutral"] / total_valid) * 100
        },
        "average_confidence": total_confidence / total_valid,
        "average_processing_time": total_processing_time / total_valid,
        "dominant_sentiment": max(sentiment_counts, key=sentiment_counts.get) # type: ignore
    }

@app.function(

    image=image,

    gpu="T4",

    memory=2048,

    timeout=180

)
def gpu_keyword_extraction(texts: List[str], max_keywords: int = 10) -> Dict[str, Any]:
    """

    Extract keywords using GPU-accelerated NLP models

    """
    import torch
    from transformers.pipelines import pipeline
    from textblob import TextBlob
    from sklearn.feature_extraction.text import TfidfVectorizer
    import time
    
    start_time = time.time()
    logger.info(f"Starting GPU keyword extraction for {len(texts)} texts")
    
    try:
        # Combine all texts
        combined_text = " ".join(texts)
        
        # Use TextBlob for noun phrase extraction
        blob = TextBlob(combined_text)
        noun_phrases = list(blob.noun_phrases) # type: ignore
        
        # Use TF-IDF for important terms
        vectorizer = TfidfVectorizer(
            max_features=max_keywords * 2,
            ngram_range=(1, 3),
            stop_words="english"
        )
        
        if texts:
            tfidf_matrix = vectorizer.fit_transform(texts)
            feature_names = vectorizer.get_feature_names_out()
            tfidf_scores = tfidf_matrix.sum(axis=0).A1 # type: ignore
            
            # Get top TF-IDF terms
            top_indices = tfidf_scores.argsort()[-max_keywords:][::-1]
            tfidf_keywords = [(feature_names[i], tfidf_scores[i]) for i in top_indices]
        else:
            tfidf_keywords = []
        
        # Combine and rank keywords
        all_keywords = {}
        
        # Add noun phrases
        for phrase in noun_phrases:
            if len(phrase) > 3:  # Filter short phrases
                all_keywords[phrase] = all_keywords.get(phrase, 0) + 1
        
        # Add TF-IDF terms
        for term, score in tfidf_keywords:
            all_keywords[term] = all_keywords.get(term, 0) + score
        
        # Sort by importance
        sorted_keywords = sorted(all_keywords.items(), key=lambda x: x[1], reverse=True)
        
        result = {
            "keywords": sorted_keywords[:max_keywords],
            "total_texts": len(texts),
            "processing_time": time.time() - start_time,
            "method": "hybrid_tfidf_nlp"
        }
        
        logger.info(f"Keyword extraction completed in {result['processing_time']:.2f}s")
        return result
        
    except Exception as e:
        logger.error(f"Error in keyword extraction: {str(e)}")
        return {"error": str(e)}

class GPUProcessor:
    """Client interface for Modal GPU processing"""
    
    def __init__(self):
        """Initialize GPU processor client"""
        self.app = app
        self.modal_available = False
        self.setup_modal_client()
    
    def setup_modal_client(self):
        """Setup Modal client with credentials and test connection"""
        logger.info("βš™οΈ Modal Labs client setup - Started")
        
        try:
            # Check for Modal token
            modal_token = os.getenv("MODAL_TOKEN")
            
            if not modal_token:
                error_msg = "No Modal token found in environment variables (MODAL_TOKEN)"
                logger.error(f"❌ Modal Labs API failed: {error_msg}")
                self.modal_available = False
                return
            
            logger.info(f"πŸ”Œ Attempting to connect to Modal Labs API with token ending in ...{modal_token[-4:]}")
            
            # Test Modal connection by trying to import and validate
            try:
                import modal
                # Try to create a simple app to test connection
                test_app = modal.App("connection-test")
                logger.info("βœ… Modal Labs API connected successfully - client initialized")
                self.modal_available = True
                logger.info("βš™οΈ Modal Labs client setup - Completed")
                
            except Exception as modal_error:
                error_msg = f"Modal Labs connection test failed: {str(modal_error)}"
                logger.error(f"❌ Modal Labs API failed: {error_msg}")
                self.modal_available = False
                
        except Exception as e:
            error_msg = f"Modal Labs client setup failed: {str(e)}"
            logger.error(f"❌ Modal Labs API failed: {error_msg}")
            logger.error("βš™οΈ Modal Labs client setup - Failed")
            self.modal_available = False
        
    async def batch_sentiment_analysis(self, data_sources: List[Any]) -> Dict[str, Any]:
        """

        Process multiple data sources with sentiment analysis (GPU if available, fallback to local)

        

        Args:

            data_sources: List of data from different collectors

            

        Returns:

            Comprehensive sentiment analysis

        """
        try:
            # Extract texts from different data sources
            all_texts = []
            source_mapping = {}
            
            for i, source in enumerate(data_sources):
                source_texts = self._extract_texts_from_source(source)
                
                # Track which texts come from which source
                start_idx = len(all_texts)
                all_texts.extend(source_texts)
                end_idx = len(all_texts)
                
                source_mapping[f"source_{i}"] = {
                    "start": start_idx,
                    "end": end_idx,
                    "count": len(source_texts),
                    "type": self._identify_source_type(source),
                    "data": source  # Store original data for fallback
                }
            
            if not all_texts:
                return {"error": "No texts found in data sources"}
            
            # Try Modal GPU processing if available
            if self.modal_available:
                try:
                    logger.info(f"Sending {len(all_texts)} texts to Modal GPU processing")
                    with app.run():
                        sentiment_results = gpu_batch_sentiment_analysis.remote(all_texts)
                    
                    # Reorganize results by source
                    organized_results = self._organize_results_by_source(
                        sentiment_results, source_mapping # type: ignore
                    )
                    
                    return organized_results
                    
                except Exception as modal_error:
                    logger.warning(f"Modal GPU processing failed: {str(modal_error)}")
                    logger.info("Falling back to local sentiment analysis")
            
            # Fallback to local sentiment analysis
            logger.info(f"Processing {len(all_texts)} texts with local sentiment analysis")
            sentiment_results = await self._local_sentiment_analysis(all_texts)
            
            # Reorganize results by source
            organized_results = self._organize_results_by_source(
                sentiment_results, source_mapping
            )
            
            return organized_results
            
        except Exception as e:
            logger.error(f"Error in batch sentiment analysis: {str(e)}")
            return {"error": str(e)}
    
    async def extract_keywords(self, texts: List[str], max_keywords: int = 20) -> Dict[str, Any]:
        """Extract keywords using GPU acceleration"""
        try:
            with app.run():
                result = gpu_keyword_extraction.remote(texts, max_keywords)
            return result
        except Exception as e:
            logger.error(f"Error in keyword extraction: {str(e)}")
            return {"error": str(e)}
    
    async def _local_sentiment_analysis(self, texts: List[str]) -> Dict[str, Any]:
        """

        Local sentiment analysis fallback using VADER and TextBlob

        """
        try:
            from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
            from textblob import TextBlob
            import time
            
            start_time = time.time()
            vader_analyzer = SentimentIntensityAnalyzer()
            
            results = []
            
            for text in texts:
                if not text or len(text.strip()) == 0:
                    results.append({
                        "text": text,
                        "error": "Empty text",
                        "processing_time": 0
                    })
                    continue
                
                text_start = time.time()
                
                # VADER sentiment
                vader_scores = vader_analyzer.polarity_scores(text)
                
                # TextBlob sentiment
                try:
                    blob = TextBlob(text)
                    textblob_sentiment = blob.sentiment
                except:
                    textblob_sentiment = None
                
                # Simple consensus (weighted average)
                vader_compound = vader_scores["compound"]
                textblob_polarity = textblob_sentiment.polarity if textblob_sentiment else 0
                
                # Calculate consensus
                weighted_score = (vader_compound * 0.6) + (textblob_polarity * 0.4)
                
                if weighted_score >= 0.1:
                    consensus_label = "positive"
                elif weighted_score <= -0.1:
                    consensus_label = "negative"
                else:
                    consensus_label = "neutral"
                
                # Calculate confidence based on agreement
                agreement = 1.0 - abs(vader_compound - textblob_polarity) / 2.0
                confidence = max(0.0, min(1.0, agreement))
                
                result = {
                    "text": text[:100] + "..." if len(text) > 100 else text,
                    "vader": {
                        "compound": vader_compound,
                        "positive": vader_scores["pos"],
                        "negative": vader_scores["neg"],
                        "neutral": vader_scores["neu"]
                    },
                    "textblob": {
                        "polarity": textblob_polarity,
                        "subjectivity": textblob_sentiment.subjectivity if textblob_sentiment else 0
                    },
                    "consensus": {
                        "label": consensus_label,
                        "score": weighted_score,
                        "confidence": confidence
                    },
                    "processing_time": time.time() - text_start
                }
                
                results.append(result)
            
            total_time = time.time() - start_time
            
            # Calculate summary
            summary = _calculate_batch_summary(results)
            
            return {
                "results": results,
                "summary": summary,
                "processing_stats": {
                    "total_texts": len(texts),
                    "total_time": total_time,
                    "texts_per_second": len(texts) / total_time if total_time > 0 else 0,
                    "gpu_used": False,
                    "model_used": "local_vader_textblob"
                }
            }
            
        except Exception as e:
            logger.error(f"Error in local sentiment analysis: {str(e)}")
            return {"error": str(e)}
    
    def _extract_texts_from_source(self, source: Any) -> List[str]:
        """Extract text content from different data source formats"""
        texts = []
        
        if isinstance(source, dict):
            # App Store reviews
            if "apps" in source:
                for app_name, app_data in source["apps"].items():
                    if "reviews" in app_data:
                        for review in app_data["reviews"]:
                            title = review.get("title", "")
                            content = review.get("content", "")
                            combined = f"{title} {content}".strip()
                            if combined:
                                texts.append(combined)
            
            # Reddit posts
            elif "posts" in source:
                for post in source["posts"]:
                    title = post.get("title", "")
                    selftext = post.get("selftext", "")
                    combined = f"{title} {selftext}".strip()
                    if combined:
                        texts.append(combined)
            
            # News articles (check multiple possible structures)
            elif "articles" in source:
                for article in source["articles"]:
                    title = article.get("title", "")
                    description = article.get("description", "")
                    combined = f"{title} {description}".strip()
                    if combined:
                        texts.append(combined)
            
            # News search results structure
            elif "search_results" in source:
                for search_term, results in source["search_results"].items():
                    if "articles" in results:
                        for article in results["articles"]:
                            title = article.get("title", "")
                            description = article.get("description", "")
                            combined = f"{title} {description}".strip()
                            if combined:
                                texts.append(combined)
            
            # Reddit query results structure
            elif "query_results" in source:
                for query, result in source["query_results"].items():
                    if "posts" in result:
                        for post in result["posts"]:
                            title = post.get("title", "")
                            selftext = post.get("selftext", "")
                            combined = f"{title} {selftext}".strip()
                            if combined:
                                texts.append(combined)
        
        return texts
    
    def _identify_source_type(self, source: Any) -> str:
        """Identify the type of data source"""
        if isinstance(source, dict):
            if "apps" in source:
                return "app_store"
            elif "posts" in source:
                return "reddit"
            elif "articles" in source:
                return "news"
        return "unknown"
    
    def _organize_results_by_source(self, sentiment_results: Dict[str, Any], 

                                   source_mapping: Dict[str, Any]) -> Dict[str, Any]:
        """Organize sentiment results by original data source"""
        if "error" in sentiment_results:
            return sentiment_results
        
        results = sentiment_results.get("results", [])
        summary = sentiment_results.get("summary", {})
        processing_stats = sentiment_results.get("processing_stats", {})
        
        organized = {
            "by_source": {},
            "overall_summary": summary,
            "processing_stats": processing_stats
        }
        
        # Split results back to sources
        for source_id, mapping in source_mapping.items():
            start_idx = mapping["start"]
            end_idx = mapping["end"]
            source_results = results[start_idx:end_idx]
            
            # Calculate source-specific summary
            source_summary = _calculate_batch_summary(source_results)
            
            organized["by_source"][source_id] = {
                "type": mapping["type"],
                "count": mapping["count"],
                "results": source_results,
                "summary": source_summary
            }
        
        return organized

# Example usage and testing functions
async def test_gpu_processor():
    """Test function for GPU processor"""
    processor = GPUProcessor()
    
    # Test data
    test_texts = [
        "This product is amazing! I love using it every day.",
        "Terrible experience, would not recommend to anyone.",
        "It's okay, nothing special but does the job.",
        "Outstanding features and great customer service!",
        "Complete waste of money, very disappointed."
    ]
    
    # Test sentiment analysis
    print("Testing GPU sentiment analysis...")
    mock_sources = [
        {"posts": [{"title": text, "selftext": ""} for text in test_texts[:3]]},
        {"articles": [{"title": text, "description": ""} for text in test_texts[3:]]}
    ]
    
    sentiment_result = await processor.batch_sentiment_analysis(mock_sources)
    print(f"Sentiment analysis completed: {sentiment_result.get('processing_stats', {}).get('total_texts', 0)} texts processed")
    
    # Test keyword extraction
    print("Testing GPU keyword extraction...")
    keyword_result = await processor.extract_keywords(test_texts)
    print(f"Keyword extraction: {len(keyword_result.get('keywords', []))} keywords found")
    
    return sentiment_result, keyword_result

if __name__ == "__main__":
    # Run test
    asyncio.run(test_gpu_processor())