File size: 3,842 Bytes
a322d7b 754cdf8 1bfa10c 2646146 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
"""Utility functions for FinFast data collection from DynamoDB to MongoDB"""
import os
import boto3
from boto3.dynamodb.types import TypeDeserializer
from pymongo.errors import PyMongoError
from botocore.exceptions import ClientError, BotoCoreError
def get_dynamodb_client():
"""
Create and return a DynamoDB client using AWS credentials from environment variables.
Returns:
boto3.client: Configured DynamoDB client
"""
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
return boto3.client(
service_name='dynamodb',
region_name='us-east-1',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
def scan_dynamodb_table(table_name, filter_date, item_processor):
"""
Scan a DynamoDB table and process items using the provided processor function.
Args:
table_name (str): Name of the DynamoDB table to scan
filter_date (str): Filter date in 'YYYY-MM-DD' format
item_processor (callable): Function to process each item
Raises:
ClientError: If there is an error during DynamoDB scan operation
BotoCoreError: If there is a boto3 core error
"""
dynamodb = get_dynamodb_client()
last_evaluated_key = None
deserializer = TypeDeserializer()
try:
while True:
# Prepare scan parameters
scan_params = {
'TableName': table_name,
'FilterExpression': 'publishDate >= :date',
'ExpressionAttributeValues': {
':date': {'S': filter_date}
}
}
# Add ExclusiveStartKey if it exists
if last_evaluated_key:
scan_params['ExclusiveStartKey'] = last_evaluated_key
# Scan the table
response = dynamodb.scan(**scan_params)
items = [
{k: deserializer.deserialize(v) for k, v in item.items()}
for item in response.get('Items', [])
]
# Process items using the provided processor
for item in items:
item_processor(item)
last_evaluated_key = response.get('LastEvaluatedKey')
if not last_evaluated_key:
break
except (ClientError, BotoCoreError) as e:
print(f"Error in scan operation: {e}")
raise
def delete_old_documents(collection, cutoff_date, use_logger=None):
"""
Delete documents from MongoDB collection that are older than the specified cutoff date.
Args:
collection: MongoDB collection object
cutoff_date (str): The cutoff date in 'YYYY-MM-DD' format
use_logger: Optional logger object. If provided, will use logger, otherwise use print
Raises:
PyMongoError: If there is an error during the delete operation
"""
try:
result = collection.delete_many({
'publishDate': {'$lt': cutoff_date}
})
message = f"Deleted {result.deleted_count} documents older than {cutoff_date}"
if use_logger:
use_logger.info("Deleted %d documents older than %s", result.deleted_count, cutoff_date)
else:
print(message)
except PyMongoError as e:
error_message = f"Error deleting old documents: {e}"
if use_logger:
use_logger.error("Error deleting old documents: %s", e)
else:
print(error_message)
raise
def upsert_item(collection, item):
"""Helper function to upsert an item into a MongoDB collection."""
collection.update_one(
{'_id': item['_id']},
{'$set': item},
upsert=True
)
print(f"Successfully processed item: {item['_id']}")
|