File size: 3,842 Bytes
a322d7b
754cdf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1bfa10c
2646146
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Utility functions for FinFast data collection from DynamoDB to MongoDB"""

import os
import boto3
from boto3.dynamodb.types import TypeDeserializer
from pymongo.errors import PyMongoError
from botocore.exceptions import ClientError, BotoCoreError


def get_dynamodb_client():
    """
    Create and return a DynamoDB client using AWS credentials from environment variables.
    
    Returns:
        boto3.client: Configured DynamoDB client
    """
    aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
    aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

    return boto3.client(
        service_name='dynamodb',
        region_name='us-east-1',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )


def scan_dynamodb_table(table_name, filter_date, item_processor):
    """
    Scan a DynamoDB table and process items using the provided processor function.
    
    Args:
        table_name (str): Name of the DynamoDB table to scan
        filter_date (str): Filter date in 'YYYY-MM-DD' format
        item_processor (callable): Function to process each item
        
    Raises:
        ClientError: If there is an error during DynamoDB scan operation
        BotoCoreError: If there is a boto3 core error
    """
    dynamodb = get_dynamodb_client()
    last_evaluated_key = None
    deserializer = TypeDeserializer()

    try:
        while True:
            # Prepare scan parameters
            scan_params = {
                'TableName': table_name,
                'FilterExpression': 'publishDate >= :date',
                'ExpressionAttributeValues': {
                    ':date': {'S': filter_date}
                }
            }

            # Add ExclusiveStartKey if it exists
            if last_evaluated_key:
                scan_params['ExclusiveStartKey'] = last_evaluated_key

            # Scan the table
            response = dynamodb.scan(**scan_params)
            items = [
                {k: deserializer.deserialize(v) for k, v in item.items()}
                for item in response.get('Items', [])
            ]

            # Process items using the provided processor
            for item in items:
                item_processor(item)

            last_evaluated_key = response.get('LastEvaluatedKey')
            if not last_evaluated_key:
                break

    except (ClientError, BotoCoreError) as e:
        print(f"Error in scan operation: {e}")
        raise


def delete_old_documents(collection, cutoff_date, use_logger=None):
    """
    Delete documents from MongoDB collection that are older than the specified cutoff date.
    
    Args:
        collection: MongoDB collection object
        cutoff_date (str): The cutoff date in 'YYYY-MM-DD' format
        use_logger: Optional logger object. If provided, will use logger, otherwise use print
        
    Raises:
        PyMongoError: If there is an error during the delete operation
    """
    try:
        result = collection.delete_many({
            'publishDate': {'$lt': cutoff_date}
        })
        message = f"Deleted {result.deleted_count} documents older than {cutoff_date}"
        if use_logger:
            use_logger.info("Deleted %d documents older than %s", result.deleted_count, cutoff_date)
        else:
            print(message)
    except PyMongoError as e:
        error_message = f"Error deleting old documents: {e}"
        if use_logger:
            use_logger.error("Error deleting old documents: %s", e)
        else:
            print(error_message)
        raise

def upsert_item(collection, item):
    """Helper function to upsert an item into a MongoDB collection."""
    collection.update_one(
        {'_id': item['_id']},
        {'$set': item},
        upsert=True
    )
    print(f"Successfully processed item: {item['_id']}")