File size: 14,370 Bytes
c2e02ba
 
 
 
5b0f157
c2e02ba
5b0f157
c2e02ba
 
72f4cb5
c2e02ba
c96129a
 
 
 
 
 
 
 
 
 
5b0f157
 
 
c96129a
 
 
 
 
5b0f157
c96129a
 
5b0f157
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c96129a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2e02ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b235ac9
c2e02ba
 
 
 
 
 
c96129a
c2e02ba
 
 
 
 
 
 
 
 
 
 
 
 
 
c96129a
c2e02ba
 
 
c96129a
 
c2e02ba
 
 
 
 
 
c96129a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b0f157
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2e02ba
 
5b0f157
c96129a
 
 
 
 
c2e02ba
 
c96129a
 
5b0f157
c2e02ba
 
c96129a
 
 
 
c2e02ba
 
 
c96129a
c2e02ba
 
 
 
b235ac9
 
c2e02ba
 
 
b235ac9
c2e02ba
 
 
 
 
b235ac9
c2e02ba
 
 
 
b235ac9
c2e02ba
b235ac9
c2e02ba
b235ac9
c2e02ba
b235ac9
c2e02ba
 
b235ac9
c2e02ba
 
b235ac9
c2e02ba
 
b235ac9
 
 
c2e02ba
b235ac9
c2e02ba
 
b235ac9
c2e02ba
 
b235ac9
c2e02ba
b235ac9
 
 
c2e02ba
b235ac9
 
c2e02ba
b235ac9
 
 
 
 
 
 
 
 
c2e02ba
 
b235ac9
c2e02ba
 
35830b5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""Utility functions for Summary calculations.
This module contains utility functions for both Content Flow Tracker and Entity Analysis,
extracted and merged from the previous content/utils.py and entity/utils.py files.
"""
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List
from collections import defaultdict

from models.database import article_collection, entity_collection # pylint: disable=import-error

# Entity type full names mapping
ENTITY_TYPE_FULL_NAMES = {
    "GPE": "Geopolitical Entities (Countries/Cities)",
    "LOC": "Locations (Non-political)",
    "ORG": "Organizations",
    "PERSON": "People",
    "PROD": "Products",
    "PRODUCT": "Products",
    "PRODCAT": "Product Categories",
    "PRODUCT_CATEGORY": "Product Categories",
    "COMPANY": "Companies",
    "FINANCIAL_ASSET": "Financial Assets",
    "ECONOMIC_INDICATOR": "Economic Indicators",
    "EVENT": "Events",
    "LANGUAGE": "Languages",
    "NORP": "Nationalities/Religious/Political Groups",
    "LAW": "Laws/Legal Documents",
    "FAC": "Facilities/Landmarks",
    "INDUSTRY": "Industries",
}

# Allowed entity types for analysis
ALLOWED_ENTITY_TYPES = {
    "GPE", "ORG", "PERSON", "COMPANY", 
    "FINANCIAL_ASSET", "ECONOMIC_INDICATOR", "INDUSTRY"
}

# Load entity normalization mapping
def _load_entity_mapping() -> Dict[str, str]:
    """Load entity normalization mapping from JSON file."""
    try:
        with open("mapping.json", 'r', encoding='utf-8') as f:
            return json.load(f)
    except FileNotFoundError:
        return {}


def normalize_entity_name(entity_name: str) -> str:
    """
    Normalize entity names using the mapping file.
    
    Parameters
    ----------
    entity_name : str
        The original entity name to normalize.
        
    Returns
    -------
    str
        The normalized entity name, or original if no mapping found.
    """
    if not entity_name:
        return entity_name

    # Convert to string and clean
    normalized = str(entity_name).strip()

    # Apply basic replacements
    normalized = normalized.replace("U.S.", "US")
    normalized = normalized.replace("consumer price index", "CPI")
    normalized = normalized.replace("Gross Domestic Product", "GDP")

    # Load and apply mapping
    mapping = _load_entity_mapping()
    return mapping.get(normalized, normalized)


def aggregate_entities(entities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Aggregate duplicate entities by summing their occurrence counts.
    
    Parameters
    ----------
    entities : List[Dict[str, Any]]
        A list of entity dictionaries where each dictionary must contain:
        - 'entity' (str): The name of the entity
        - 'type' (str): The type/category of the entity
        - 'occurrence' (int): The count of occurrences for this entity
        
    Returns
    -------
    List[Dict[str, Any]]
        A list of unique entity dictionaries with aggregated occurrence counts,
        where each dictionary contains:
        - 'entity' (str): The normalized entity name
        - 'type' (str): The entity type (unchanged)
        - 'occurrence' (int): The summed occurrence count across all duplicates
    """
    aggregated = {}

    for entity in entities:
        # Normalize entity name
        normalized_name = normalize_entity_name(entity['entity'])
        key = (normalized_name, entity['type'])

        if key in aggregated:
            aggregated[key] += entity['occurrence']
        else:
            aggregated[key] = entity['occurrence']

    # Convert back to list of dictionaries
    result = []
    for (entity_name, entity_type), count in aggregated.items():
        result.append({
            'entity': entity_name,
            'type': entity_type,
            'occurrence': count
        })

    return result


def _build_sentiment_lookup(sentiment_results: list) -> Dict:
    """Build sentiment lookup dictionary from sentiment aggregation results."""
    sentiment_lookup = {}
    for result in sentiment_results:
        key = (result["_id"]["entity"], result["_id"]["type"])
        sentiment_lookup[key] = round(result["avgSentiment"], 3)
    return sentiment_lookup


def _process_entity_with_sentiment(mentions_result: Dict, sentiment_lookup: Dict) -> Dict[str, Any]:
    """Process a single entity result and add sentiment information."""
    entity_id = mentions_result["_id"]
    entity_key = (entity_id["entity"], entity_id["type"])

    return {
        "entityName": entity_id["entity"].replace("_", " "),
        "mentions": mentions_result["mentions"],
        "sentiment": sentiment_lookup.get(entity_key)
    }


def _get_latest_publish_date_from_collection(collection) -> datetime:
    """Return the latest publish date found in the specified collection.

    Parameters
    ----------
    collection:
        MongoDB collection to query for the latest publishDate.

    Returns
    -------
    datetime
        Latest publish date found, or current date if collection is empty.
    """
    latest_doc = collection.find_one(
        sort=[("publishDate", -1)], projection={"publishDate": 1}
    )
    if latest_doc and "publishDate" in latest_doc:
        return datetime.strptime(latest_doc["publishDate"], "%Y-%m-%d")
    return datetime.today()


def _time_range(filter_type: str, collection) -> tuple[str, str]:
    """Calculate *inclusive* start / end date strings using rolling window approach.
    
    Uses rolling window logic:
    - today: only the latest date
    - weekly: latest date - 6 days (total 7 days)
    - monthly: latest date - 29 days (total 30 days)

    Parameters
    ----------
    filter_type:
        One of ``today``, ``week``/``weekly`` or ``month``/``monthly``.  Any
        unrecognised value will fall back to *all time* where the start date is
        ``datetime.min``.
    collection:
        MongoDB collection to get the latest date from.

    Returns
    -------
    tuple[str, str]
        Start and end dates as strings in YYYY-MM-DD format.
    """
    latest_date = _get_latest_publish_date_from_collection(collection)

    if filter_type in {"today"}:
        start = latest_date.date()
    elif filter_type in {"week", "weekly"}:
        # Latest date minus 6 days (total 7 days)
        start = (latest_date - timedelta(days=6)).date()
    elif filter_type in {"month", "monthly"}:
        # Latest date minus 29 days (total 30 days)
        start = (latest_date - timedelta(days=29)).date()
    else:
        start = datetime.min.date()

    return str(start), str(latest_date.date())



def get_content_flow_data(time_filter: str) -> Dict[str, Any]:
    """Return aggregated *Content Flow Tracker* data for the given period.

    Uses rolling window approach:
    - today: only the latest date
    - weekly: latest date - 6 days (total 7 days)  
    - monthly: latest date - 29 days (total 30 days)

    Parameters
    ----------
    time_filter:
        Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time).

    Returns
    -------
    Dict[str, Any]
        Dictionary containing title, dateRange, and aggregated content flow data.
    """
    start, end = _time_range(time_filter, article_collection)

    pipeline = [
        {"$match": {"publishDate": {"$gte": start, "$lte": end}}},
        {"$group": {"_id": {"source": "$site", "category": "$category"}, "count": {"$sum": 1}}},
        {"$sort": {"count": -1}},
    ]

    results = list(article_collection.aggregate(pipeline))

    data = [
        {
            "category": r["_id"].get("category", "Uncategorized"),
            "source": r["_id"]["source"],
            "count": r["count"],
        }
        for r in results
    ]

    return {
        "title": f"Content Flow Tracker  {time_filter.capitalize()}",
        "dateRange": {"start": start, "end": end},
        "data": data,
    }


def get_entity_analysis_data(time_filter: str) -> Dict[str, Any]:
    """Return *Entity Analysis* data for the given period with sentiment information.

    Uses rolling window approach:
    - today: only the latest date
    - weekly: latest date - 6 days (total 7 days)  
    - monthly: latest date - 29 days (total 30 days)

    Parameters
    ----------
    time_filter:
        Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time).

    Returns
    -------
    Dict[str, Any]
        Dictionary containing title, dateRange, and aggregated entity analysis data with sentiment.
    """
    start, end = _time_range(time_filter, entity_collection)

    # Get mentions count pipeline
    mentions_pipeline = [
        {"$match": {"publishDate": {"$gte": start, "$lte": end}}},
        {"$group": {"_id": {"entity": "$entity", "type": "$entityType"},
                   "mentions": {"$sum": "$occurrence"}}},
        {"$sort": {"mentions": -1}},
    ]

    # Get sentiment data pipeline
    sentiment_pipeline = [
        {"$match": {
            "publishDate": {"$gte": start, "$lte": end},
            "sentimentScore": {"$exists": True, "$ne": None}
        }},
        {"$group": {
            "_id": {"entity": "$entity", "type": "$entityType"},
            "avgSentiment": {"$avg": "$sentimentScore"},

        }}
    ]

    mentions_results = list(entity_collection.aggregate(mentions_pipeline))
    sentiment_results = list(entity_collection.aggregate(sentiment_pipeline))

    # Filter to only include allowed entity types
    mentions_results = [r for r in mentions_results if r["_id"]["type"] in ALLOWED_ENTITY_TYPES]
    sentiment_results = [r for r in sentiment_results if r["_id"]["type"] in ALLOWED_ENTITY_TYPES]

    # Convert mentions results to format expected by aggregate_entities
    entities_for_aggregation = []
    for result in mentions_results:
        entities_for_aggregation.append({
            'entity': result['_id']['entity'],
            'type': result['_id']['type'],
            'occurrence': result['mentions']
        })

    # Normalize and aggregate entities
    aggregated_entities = aggregate_entities(entities_for_aggregation)

    # Rebuild mentions results with normalized names
    normalized_mentions_results = []
    for agg_entity in aggregated_entities:
        normalized_mentions_results.append({
            '_id': {'entity': agg_entity['entity'], 'type': agg_entity['type']},
            'mentions': agg_entity['occurrence']
        })

    # Rebuild sentiment lookup with normalized names
    normalized_sentiment_lookup = {}
    for result in sentiment_results:
        normalized_name = normalize_entity_name(result["_id"]["entity"])
        key = (normalized_name, result["_id"]["type"])
        if key in normalized_sentiment_lookup:
            # Average multiple sentiment scores for the same normalized entity
            normalized_sentiment_lookup[key] = (
                normalized_sentiment_lookup[key] + result["avgSentiment"]) / 2
        else:
            normalized_sentiment_lookup[key] = result["avgSentiment"]

    entity_types: Dict[str, Any] = {}
    for mentions_result in normalized_mentions_results:
        entity_type = mentions_result["_id"]["type"]

        if entity_type not in entity_types:
            entity_types[entity_type] = {
                "fullName": ENTITY_TYPE_FULL_NAMES.get(entity_type, entity_type),
                "entities": [],
            }

        entity_types[entity_type]["entities"].append(
            _process_entity_with_sentiment(mentions_result, normalized_sentiment_lookup)
        )

    # Keep only the top 10 per type
    for entity_data in entity_types.values():
        entity_data["entities"] = sorted(
            entity_data["entities"], key=lambda x: -x["mentions"]
        )[:10]

    return {
        "title": f"Top Entities - {time_filter.capitalize()}",
        "dateRange": {"start": start, "end": end},
        "data": entity_types,
    }

def get_sentiment_analysis_data(time_filter: str) -> Dict[str, Any]:
    """Return aggregated *Sentiment Analysis* data for articles by category for the given period.

    Uses rolling window approach:
    - today: only the latest date
    - weekly: latest date - 6 days (total 7 days)  
    - monthly: latest date - 29 days (total 30 days)

    Parameters
    ----------
    time_filter:
        Time period filter ('today', 'week'/'weekly', 'month'/'monthly', or any other for all time).

    Returns
    -------
    Dict[str, Any]
        Dictionary containing title, dateRange, and sentiment data by category and date.
    """
    start, end = _time_range(time_filter, article_collection)

    # Convert time_filter to match the original logic
    if time_filter == "today":
        start_date = datetime.strptime(end, "%Y-%m-%d").date()
        num_days = 1
    elif time_filter in {"week", "weekly"}:
        start_date = datetime.strptime(start, "%Y-%m-%d").date()
        num_days = 7
    elif time_filter in {"month", "monthly"}:
        start_date = datetime.strptime(start, "%Y-%m-%d").date()
        num_days = 30
    else:
        start_date = datetime.strptime(start, "%Y-%m-%d").date()
        end_date = datetime.strptime(end, "%Y-%m-%d").date()
        num_days = (end_date - start_date).days + 1

    # Query articles with sentiment scores
    query = {
        "publishDate": {"$gte": start, "$lte": end},
        "sentimentScore": {"$exists": True}
    }

    daily_scores = defaultdict(lambda: defaultdict(list))

    # Aggregate sentiment scores by category and date
    for doc in list(article_collection.find(query)):
        category = doc.get("category", "Unknown")
        score = doc.get("sentimentScore")
        if category and score is not None and doc.get("publishDate"):
            daily_scores[category][doc.get("publishDate")].append(score)

    # Generate nested data structure: date -> category -> sentiment
    data = {}
    for i in range(num_days):
        day = (start_date + timedelta(days=i)).isoformat()
        data[day] = {}
        for category in daily_scores:
            scores = daily_scores[category].get(day, [])
            if scores:  # This checks if scores is not empty
                data[day][category] = sum(scores) / len(scores)

    return {
        "title": f"Sentiment Analysis by Category  {time_filter.capitalize()}",
        "dateRange": {"start": start, "end": end},
        "data": data
    }