Spaces:
Running
on
Zero
Running
on
Zero
File size: 3,261 Bytes
ef5252d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
import boto3
import csv
from decimal import Decimal
import datetime
from boto3.dynamodb.conditions import Key
from tools.config import AWS_REGION, ACCESS_LOG_DYNAMODB_TABLE_NAME, FEEDBACK_LOG_DYNAMODB_TABLE_NAME, USAGE_LOG_DYNAMODB_TABLE_NAME, OUTPUT_FOLDER
# Replace with your actual table name and region
TABLE_NAME = USAGE_LOG_DYNAMODB_TABLE_NAME # Choose as appropriate
REGION = AWS_REGION
CSV_OUTPUT = OUTPUT_FOLDER + 'dynamodb_logs_export.csv'
# Create DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table(TABLE_NAME)
# Helper function to convert Decimal to float or int
def convert_types(item):
new_item = {}
for key, value in item.items():
# Handle Decimals first
if isinstance(value, Decimal):
new_item[key] = int(value) if value % 1 == 0 else float(value)
# Handle Strings that might be dates
elif isinstance(value, str):
try:
# Attempt to parse a common ISO 8601 format.
# The .replace() handles the 'Z' for Zulu/UTC time.
dt_obj = datetime.datetime.fromisoformat(value.replace('Z', '+00:00'))
# Now that we have a datetime object, format it as desired
new_item[key] = dt_obj.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
except (ValueError, TypeError):
# If it fails to parse, it's just a regular string
new_item[key] = value
# Handle all other types
else:
new_item[key] = value
return new_item
# Paginated scan
def scan_table():
items = []
response = table.scan()
items.extend(response['Items'])
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items.extend(response['Items'])
return items
# Export to CSV
# Export to CSV
def export_to_csv(items, output_path, fields_to_drop: list = None):
if not items:
print("No items found.")
return
# Use a set for efficient lookup
drop_set = set(fields_to_drop or [])
# Get a comprehensive list of all possible headers from all items
all_keys = set()
for item in items:
all_keys.update(item.keys())
# Determine the final fieldnames by subtracting the ones to drop
fieldnames = sorted(list(all_keys - drop_set))
print("Final CSV columns will be:", fieldnames)
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
# The key fix is here: extrasaction='ignore'
# restval='' is also good practice to handle rows that are missing a key
writer = csv.DictWriter(
csvfile,
fieldnames=fieldnames,
extrasaction='ignore',
restval=''
)
writer.writeheader()
for item in items:
# The convert_types function can now return the full dict,
# and the writer will simply ignore the extra fields.
writer.writerow(convert_types(item))
print(f"Exported {len(items)} items to {output_path}")
# Run export
items = scan_table()
export_to_csv(items, CSV_OUTPUT, fields_to_drop=['Query metadata - usage counts and other parameters']) |