import boto3 from utility import * # initialize dynamodb instance db_client = boto3.client( "dynamodb", aws_access_key_id = aws_access_key_id, aws_secret_access_key = aws_secret_access_key, region_name = "us-east-1" ) ''' dynamodb data operations ''' # get the list of articles from articles table in dynamodb def get_table(table_name:str): result = db_client.scan(TableName = table_name,AttributesToGet = data_structure[table_name]["fields"]) return [db_map_to_py_dict(r) for r in result["Items"]] # add a new article to table articles in dynamodb, return error if failed def post_item(table_name:str,item:dict): try: res = db_client.put_item( TableName = table_name, Item = py_dict_to_db_map(item) ) except Exception as e: return {"Error":e} return res # update an article in table articles in dynamodb, return error if failed def put_item(table_name:str,item:dict): try: res = db_client.put_item( TableName = table_name, Item = py_dict_to_db_map(item) ) except Exception as e: return {"Error":e} return res # delete an article in table articles in dynamodb, return error if not found. def delete_item(table_name:str,key:dict): try: res = db_client.delete_item( TableName = table_name, Key = py_dict_to_db_map(key) ) except Exception as e: return {"Error":e} return res ''' ''' def get_item(table_name:str,key:dict): try: res = db_client.get_item( TableName = table_name, Key = py_dict_to_db_map(key) ) except Exception as e: return {"Error":e} return res ''' dynamodb structure management ''' def get_structure(table_name:str): result = db_client.describe_table(TableName = table_name) return result["Table"]["AttributeDefinitions"]